You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/12/05 09:36:36 UTC

[2/3] incubator-carbondata git commit: modify all RDD

modify all RDD

rebase

fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1a1b18d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1a1b18d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1a1b18d0

Branch: refs/heads/master
Commit: 1a1b18d0294908d94f1fa1ba2b02e8dabb38c5a5
Parents: e642ceb
Author: jackylk <ja...@huawei.com>
Authored: Fri Dec 2 23:54:49 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Dec 5 17:35:11 2016 +0800

----------------------------------------------------------------------
 .../carbondata/hadoop/CarbonInputFormat.java    |   4 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 234 +++++++++++
 .../spark/rdd/DataManagementFunc.scala          | 371 +++++++++++++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 310 +++++++++++++++
 .../carbondata/spark/rdd/SparkCommonEnv.scala   |  30 ++
 .../spark/sql/hive/DistributionUtil.scala       |  12 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 349 +---------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 255 ------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 276 -------------
 .../apache/spark/mapred/SparkMapRedUtil.scala   |  32 --
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  18 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   7 +-
 .../execution/command/carbonTableSchema.scala   |   9 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 397 ++-----------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 250 ------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 312 ---------------
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  19 +-
 .../execution/command/carbonTableSchema.scala   |   7 +-
 20 files changed, 1024 insertions(+), 1872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index e707c4e..66f0e3b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -166,8 +166,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return configuration.get(COLUMN_PROJECTION);
   }
 
-  public static void setCarbonReadSupport(Class<? extends CarbonReadSupport> readSupportClass,
-      Configuration configuration) {
+  public static void setCarbonReadSupport(Configuration configuration,
+      Class<? extends CarbonReadSupport> readSupportClass) {
     if (readSupportClass != null) {
       configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
new file mode 100644
index 0000000..a750b10
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+/**
+ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
+ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
+ * level filtering in driver side.
+ */
+class CarbonScanRDD[V: ClassTag](
+    @transient sc: SparkContext,
+    columnProjection: CarbonProjection,
+    filterExpression: Expression,
+    identifier: AbsoluteTableIdentifier,
+    @transient carbonTable: CarbonTable)
+  extends RDD[V](sc, Nil) {
+
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  @transient private val jobId = new JobID(jobTrackerId, id)
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def getPartitions: Array[Partition] = {
+    val job = Job.getInstance(new Configuration())
+    val format = prepareInputFormatForDriver(job.getConfiguration)
+
+    // initialise query_id for job
+    job.getConfiguration.set("query.id", queryId)
+
+    // get splits
+    val splits = format.getSplits(job)
+    val result = distributeSplits(splits)
+    result
+  }
+
+  private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+    // this function distributes the split based on following logic:
+    // 1. based on data locality, to make split balanced on all available nodes
+    // 2. if the number of split for one
+
+    var statistic = new QueryStatistic()
+    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val parallelism = sparkContext.defaultParallelism
+    val result = new util.ArrayList[Partition](parallelism)
+    var noOfBlocks = 0
+    var noOfNodes = 0
+    var noOfTasks = 0
+
+    if (!splits.isEmpty) {
+      // create a list of block based on split
+      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+      // get the list of executors and map blocks to executors based on locality
+      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+      // divide the blocks among the tasks of the nodes as per the data locality
+      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+        parallelism, activeNodes.toList.asJava)
+
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statistic = new QueryStatistic()
+
+      var i = 0
+      // Create Spark Partition for each task and assign blocks
+      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+        blockList.asScala.foreach { blocksPerTask =>
+          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          if (blocksPerTask.size() != 0) {
+            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+            result.add(partition)
+            i += 1
+          }
+        }
+      }
+
+      noOfBlocks = splits.size
+      noOfNodes = nodeBlockMapping.size
+      noOfTasks = result.size()
+
+      statistic = new QueryStatistic()
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+        System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statisticRecorder.logStatisticsAsTableDriver()
+    }
+    logInfo(
+      s"""
+         | Identified no.of.blocks: $noOfBlocks,
+         | no.of.tasks: $noOfTasks,
+         | no.of.nodes: $noOfNodes,
+         | parallelism: $parallelism
+       """.stripMargin)
+    result.toArray(new Array[Partition](result.size()))
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+    if (null == carbonPropertiesFilePath) {
+      System.setProperty("carbon.properties.filepath",
+        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+      )
+    }
+
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+    val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+    val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    val reader = format.createRecordReader(inputSplit, attemptContext)
+    reader.initialize(inputSplit, attemptContext)
+
+    val queryStartTime = System.currentTimeMillis
+
+    new Iterator[V] {
+      private var havePair = false
+      private var finished = false
+      private var count = 0
+
+      context.addTaskCompletionListener { context =>
+        logStatistics(queryStartTime, count)
+        reader.close()
+      }
+
+      override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
+        if (!finished && !havePair) {
+          finished = !reader.nextKeyValue
+          if (finished) {
+            reader.close()
+          }
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        val value: V = reader.getCurrentValue
+        count += 1
+        value
+      }
+    }
+  }
+
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonTable(conf, carbonTable)
+    createInputFormat(conf)
+  }
+
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonReadSupport(conf, SparkCommonEnv.readSupportClass)
+    createInputFormat(conf)
+  }
+
+  private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+    val format = new CarbonInputFormat[V]
+    CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonInputFormat.setColumnProjection(conf, columnProjection)
+    format
+  }
+
+  def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+    var queryStatistic = new QueryStatistic()
+    queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+      System.currentTimeMillis - queryStartTime)
+    val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // result size
+    queryStatistic = new QueryStatistic()
+    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // print executor query statistics for each task_id
+    statisticRecorder.logStatisticsAsTableExecutor()
+  }
+
+  /**
+   * Get the preferred locations where to launch this task.
+   */
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonSparkPartition]
+    val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+    firstOptionLocation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
new file mode 100644
index 0000000..28a9140
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.concurrent._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
+
+/**
+ * Common functions for data life cycle management
+ */
+object DataManagementFunc {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def deleteLoadByDate(
+      sqlContext: SQLContext,
+      schema: CarbonDataLoadSchema,
+      databaseName: String,
+      tableName: String,
+      storePath: String,
+      dateField: String,
+      dateFieldActualName: String,
+      dateValue: String) {
+
+    val sc = sqlContext
+    // Delete the records based on data
+    val table = CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)
+    val loadMetadataDetailsArray =
+      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
+    val resultMap = new CarbonDeleteLoadByDateRDD(
+      sc.sparkContext,
+      new DeletedLoadResultImpl(),
+      databaseName,
+      table.getDatabaseName,
+      dateField,
+      dateFieldActualName,
+      dateValue,
+      table.getFactTableName,
+      tableName,
+      storePath,
+      loadMetadataDetailsArray).collect.groupBy(_._1)
+
+    var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
+    if (resultMap.nonEmpty) {
+      if (resultMap.size == 1) {
+        if (resultMap.contains("")) {
+          LOGGER.error("Delete by Date request is failed")
+          sys.error("Delete by Date request is failed, potential causes " +
+              "Empty store or Invalid column type, For more details please refer logs.")
+        }
+      }
+      val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
+        var statusList = resultMap.get(elem.getLoadName)
+        // check for the merged load folder.
+        if (statusList.isEmpty && null != elem.getMergedLoadName) {
+          statusList = resultMap.get(elem.getMergedLoadName)
+        }
+
+        if (statusList.isDefined) {
+          elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
+          // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
+          // use MARKED_FOR_UPDATE
+          if (statusList.get
+              .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
+            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+          } else {
+            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
+            updatedLoadMetadataDetailsList += elem
+          }
+          elem
+        } else {
+          elem
+        }
+      }
+
+      }
+
+      // Save the load metadata
+      val carbonLock = CarbonLockFactory
+          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.METADATA_LOCK
+          )
+      try {
+        if (carbonLock.lockWithRetries()) {
+          LOGGER.info("Successfully got the table metadata file lock")
+          if (updatedLoadMetadataDetailsList.nonEmpty) {
+            // TODO: Load Aggregate tables after retention.
+          }
+
+          // write
+          CarbonLoaderUtil.writeLoadMetadata(
+            schema,
+            databaseName,
+            table.getDatabaseName,
+            updatedloadMetadataDetails.asJava
+          )
+        }
+      } finally {
+        if (carbonLock.unlock()) {
+          LOGGER.info("unlock the table metadata file successfully")
+        } else {
+          LOGGER.error("Unable to unlock the metadata lock")
+        }
+      }
+    } else {
+      LOGGER.error("Delete by Date request is failed")
+      LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
+      sys.error("Delete by Date request is failed, potential causes " +
+          "Empty store or Invalid column type, For more details please refer logs.")
+    }
+  }
+
+  def executeCompaction(carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      compactionModel: CompactionModel,
+      executor: ExecutorService,
+      sqlContext: SQLContext,
+      kettleHomePath: String,
+      storeLocation: String): Unit = {
+    val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+      carbonLoadModel.getLoadMetadataDetails
+    )
+    CarbonDataMergerUtil.sortSegments(sortedSegments)
+
+    var segList = carbonLoadModel.getLoadMetadataDetails
+    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+      storePath,
+      carbonLoadModel,
+      compactionModel.compactionSize,
+      segList,
+      compactionModel.compactionType
+    )
+    while (loadsToMerge.size() > 1) {
+      val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
+      deletePartialLoadsInCompaction(carbonLoadModel)
+      val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+        CarbonCommonConstants
+            .DEFAULT_COLLECTION_SIZE
+      )
+
+      scanSegmentsAndSubmitJob(futureList,
+        loadsToMerge,
+        executor,
+        storePath,
+        sqlContext,
+        compactionModel,
+        kettleHomePath,
+        carbonLoadModel,
+        storeLocation
+      )
+
+      try {
+
+        futureList.asScala.foreach(future => {
+          future.get
+        }
+        )
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
+          throw e
+      }
+
+
+      // scan again and determine if anything is there to merge again.
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+      segList = carbonLoadModel.getLoadMetadataDetails
+      // in case of major compaction we will scan only once and come out as it will keep
+      // on doing major for the new loads also.
+      // excluding the newly added segments.
+      if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+
+        segList = CarbonDataMergerUtil
+            .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+      }
+      loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+        storePath,
+        carbonLoadModel,
+        compactionModel.compactionSize,
+        segList,
+        compactionModel.compactionType
+      )
+    }
+  }
+
+  /**
+   * This will submit the loads to be merged into the executor.
+   *
+   * @param futureList
+   */
+  def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
+      loadsToMerge: util
+      .List[LoadMetadataDetails],
+      executor: ExecutorService,
+      storePath: String,
+      sqlContext: SQLContext,
+      compactionModel: CompactionModel,
+      kettleHomePath: String,
+      carbonLoadModel: CarbonLoadModel,
+      storeLocation: String): Unit = {
+
+    loadsToMerge.asScala.foreach(seg => {
+      LOGGER.info("loads identified for merge is " + seg.getLoadName)
+    }
+    )
+
+    val compactionCallableModel = CompactionCallableModel(storePath,
+      carbonLoadModel,
+      storeLocation,
+      compactionModel.carbonTable,
+      kettleHomePath,
+      compactionModel.tableCreationTime,
+      loadsToMerge,
+      sqlContext,
+      compactionModel.compactionType
+    )
+
+    val future: Future[Void] = executor
+        .submit(new CompactionCallable(compactionCallableModel
+        )
+        )
+    futureList.add(future)
+  }
+
+  def prepareCarbonLoadModel(storePath: String,
+      table: CarbonTable,
+      newCarbonLoadModel: CarbonLoadModel): Unit = {
+    newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+    newCarbonLoadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
+    newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
+    newCarbonLoadModel.setStorePath(table.getStorePath)
+    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
+    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+    newCarbonLoadModel.setFactTimeStamp(loadStartTime)
+  }
+
+  def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
+    // Deleting the any partially loaded data if present.
+    // in some case the segment folder which is present in store will not have entry in
+    // status.
+    // so deleting those folders.
+    try {
+      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
+            s" ${ e.getMessage }")
+    }
+  }
+
+  def deleteLoadsAndUpdateMetadata(
+      carbonLoadModel: CarbonLoadModel,
+      table: CarbonTable,
+      storePath: String,
+      isForceDeletion: Boolean): Unit = {
+    if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
+      val loadMetadataFilePath = CarbonLoaderUtil
+          .extractLoadMetadataFileLocation(carbonLoadModel)
+      val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+      val carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.TABLE_STATUS_LOCK)
+
+      // Delete marked loads
+      val isUpdationRequired = DeleteLoadFolders
+          .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
+
+      if (isUpdationRequired) {
+        try {
+          // Update load metadate file after cleaning deleted nodes
+          if (carbonTableStatusLock.lockWithRetries()) {
+            LOGGER.info("Table status lock has been successfully acquired.")
+
+            // read latest table status again.
+            val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+
+            // update the metadata details from old to new status.
+            val latestStatus = CarbonLoaderUtil
+                .updateLoadMetadataFromOldToNew(details, latestMetadata)
+
+            CarbonLoaderUtil.writeLoadMetadata(
+              carbonLoadModel.getCarbonDataLoadSchema,
+              carbonLoadModel.getDatabaseName,
+              carbonLoadModel.getTableName, latestStatus)
+          } else {
+            val errorMsg = "Clean files request is failed for " +
+                s"${ carbonLoadModel.getDatabaseName }." +
+                s"${ carbonLoadModel.getTableName }" +
+                ". Not able to acquire the table status lock due to other operation " +
+                "running in the background."
+            LOGGER.audit(errorMsg)
+            LOGGER.error(errorMsg)
+            throw new Exception(errorMsg + " Please try after some time.")
+          }
+        } finally {
+          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+        }
+      }
+    }
+  }
+
+  def cleanFiles(
+      sc: SparkContext,
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String) {
+    val table = CarbonMetadata.getInstance.getCarbonTable(
+      carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
+    val carbonCleanFilesLock = CarbonLockFactory.getCarbonLockObj(
+      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK)
+    try {
+      if (carbonCleanFilesLock.lockWithRetries()) {
+        LOGGER.info("Clean files lock has been successfully acquired.")
+        deleteLoadsAndUpdateMetadata(carbonLoadModel,
+          table,
+          storePath,
+          isForceDeletion = true)
+      } else {
+        val errorMsg = "Clean files request is failed for " +
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            ". Not able to acquire the clean files lock due to another clean files " +
+            "operation is running in the background."
+        LOGGER.audit(errorMsg)
+        LOGGER.error(errorMsg)
+        throw new Exception(errorMsg + " Please try after some time.")
+
+      }
+    } finally {
+      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
new file mode 100644
index 0000000..32770f7
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.common.CarbonIterator
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+
+  @transient
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  private def writeObject(out: ObjectOutputStream): Unit =
+    try {
+      out.defaultWriteObject()
+      value.write(out)
+    } catch {
+      case e: IOException =>
+        LOGGER.error(e, "Exception encountered")
+        throw e
+      case NonFatal(e) =>
+        LOGGER.error(e, "Exception encountered")
+        throw new IOException(e)
+    }
+
+
+  private def readObject(in: ObjectInputStream): Unit =
+    try {
+      value = new Configuration(false)
+      value.readFields(in)
+    } catch {
+      case e: IOException =>
+        LOGGER.error(e, "Exception encountered")
+        throw e
+      case NonFatal(e) =>
+        LOGGER.error(e, "Exception encountered")
+        throw new IOException(e)
+    }
+}
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    loadCount: Integer,
+    blocksGroupBy: Array[(String, Array[BlockDetails])],
+    isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  private val confBroadcast =
+    sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+    if (isTableSplitPartition) {
+      // for table split partition
+      var splits: Array[TableSplit] = null
+
+      if (carbonLoadModel.isDirectLoad) {
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+      } else {
+        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName, null)
+      }
+
+      splits.zipWithIndex.map { s =>
+        // filter the same partition unique id, because only one will match, so get 0 element
+        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+          p._1 == s._1.getPartition.getUniqueID)(0)._2
+        new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+      }
+    } else {
+      // for node partition
+      blocksGroupBy.zipWithIndex.map { b =>
+        new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+      }
+    }
+  }
+
+  override def checkpoint() {
+    // Do nothing. Hadoop RDD should not be checkpointed.
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var model: CarbonLoadModel = _
+      var uniqueLoadStatusId =
+        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+      try {
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        val recordReaders = getInputIterators
+        val loader = new SparkPartitionLoader(model,
+          theSplit.index,
+          null,
+          null,
+          loadCount,
+          loadMetadataDetails)
+        // Intialize to set carbon properties
+        loader.initialize()
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        new DataLoadExecutor().execute(model,
+          loader.storeLocation,
+          recordReaders)
+      } catch {
+        case e: BadRecordFoundException =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          logInfo("Bad Record Found")
+        case e: Exception =>
+          logInfo("DataLoad failure", e)
+          LOGGER.error(e)
+          throw e
+      }
+
+      def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
+        val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
+        var configuration: Configuration = confBroadcast.value.value
+        if (configuration == null) {
+          configuration = new Configuration()
+        }
+        configureCSVInputFormat(configuration)
+        val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
+        val format = new CSVInputFormat
+        if (isTableSplitPartition) {
+          // for table split partition
+          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+          logInfo("Input split: " + split.serializableHadoopSplit.value)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            model = carbonLoadModel.getCopyWithPartition(
+                split.serializableHadoopSplit.value.getPartition.getUniqueID,
+                split.serializableHadoopSplit.value.getPartition.getFilesPath,
+                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(
+                split.serializableHadoopSplit.value.getPartition.getUniqueID)
+          }
+          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+          StandardLogService.setThreadName(partitionID, null)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+              partitionID, split.partitionBlocksDetail.length)
+          val readers =
+          split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader,
+              split.partitionBlocksDetail(index),
+              hadoopAttemptContext)
+          }
+        } else {
+          // for node partition
+          val split = theSplit.asInstanceOf[CarbonNodePartition]
+          logInfo("Input split: " + split.serializableHadoopSplit)
+          logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            val filelist: java.util.List[String] = new java.util.ArrayList[String](
+                CarbonCommonConstants.CONSTANT_SIZE_TEN)
+            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(partitionID)
+          }
+          StandardLogService.setThreadName(blocksID, null)
+          val readers =
+            split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
+          }
+        }
+      }
+
+      def configureCSVInputFormat(configuration: Configuration): Unit = {
+        CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+        CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+        CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+        CSVInputFormat.setHeaderExtractionEnabled(configuration,
+          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+        CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+        CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+      }
+
+      /**
+       * generate blocks id
+       *
+       * @return
+       */
+      def gernerateBlocksID: String = {
+        if (isTableSplitPartition) {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+            .getPartition.getUniqueID + "_" + UUID.randomUUID()
+        } else {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          UUID.randomUUID()
+        }
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    isTableSplitPartition match {
+      case true =>
+        // for table split partition
+        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+        location
+      case false =>
+        // for node partition
+        val theSplit = split.asInstanceOf[CarbonNodePartition]
+        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+        logInfo("Preferred Location for split : " + firstOptionLocation.head)
+        val blockMap = new util.LinkedHashMap[String, Integer]()
+        val tableBlocks = theSplit.blocksDetails
+        tableBlocks.foreach { tableBlock =>
+          tableBlock.getLocations.foreach { location =>
+            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+              val currentCount = blockMap.get(location)
+              if (currentCount == null) {
+                blockMap.put(location, 1)
+              } else {
+                blockMap.put(location, currentCount + 1)
+              }
+            }
+          }
+        }
+
+        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
+          nodeCount1.getValue > nodeCount2.getValue
+        }
+
+        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+        firstOptionLocation ++ sortedNodesList
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
new file mode 100644
index 0000000..bf614b1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+// Used to solve cyclic-dependency issue of carbon-spark-common and carbon-spark, carbon-spark2
+// modules, variables or functions that different in carbon-spark and carbon-spark2 are set here
+object SparkCommonEnv {
+
+  var readSupportClass: Class[_ <: CarbonReadSupport[_]] = _
+
+  var numExistingExecutors: Int = _
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 8b1a2bb..5b9353e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.SparkCommonEnv
 
 object DistributionUtil {
   @transient
@@ -215,9 +216,6 @@ object DistributionUtil {
     nodes.distinct.toSeq
   }
 
-  // Hack for spark2 integration
-  var numExistingExecutors: Int = _
-
   /**
    * Requesting the extra executors other than the existing ones.
    *
@@ -233,13 +231,11 @@ object DistributionUtil {
       hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean = {
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        LOGGER
-          .info(
+        LOGGER.info(
             s"number of required executors are = $requiredExecutors and existing executors are = " +
-            s"$numExistingExecutors")
+            s"${SparkCommonEnv.numExistingExecutors}")
         if (requiredExecutors > 0) {
-          LOGGER
-            .info(s"Requesting total executors: $requiredExecutors")
+          LOGGER.info(s"Requesting total executors: $requiredExecutors")
           b.requestTotalExecutors(requiredExecutors, localityAwareTasks, hostToLocalTaskCount)
         }
         true

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e5eb78a..8463477 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
@@ -64,105 +65,6 @@ object CarbonDataRDDFactory {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def deleteLoadByDate(
-      sqlContext: SQLContext,
-      schema: CarbonDataLoadSchema,
-      databaseName: String,
-      tableName: String,
-      storePath: String,
-      dateField: String,
-      dateFieldActualName: String,
-      dateValue: String) {
-
-    val sc = sqlContext
-    // Delete the records based on data
-    val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(databaseName + "_" + tableName)
-    val loadMetadataDetailsArray =
-      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
-    val resultMap = new CarbonDeleteLoadByDateRDD(
-      sc.sparkContext,
-      new DeletedLoadResultImpl(),
-      databaseName,
-      table.getDatabaseName,
-      dateField,
-      dateFieldActualName,
-      dateValue,
-      table.getFactTableName,
-      tableName,
-      storePath,
-      loadMetadataDetailsArray).collect.groupBy(_._1)
-
-    var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
-    if (resultMap.nonEmpty) {
-      if (resultMap.size == 1) {
-        if (resultMap.contains("")) {
-          LOGGER.error("Delete by Date request is failed")
-          sys.error("Delete by Date request is failed, potential causes " +
-                    "Empty store or Invalid column type, For more details please refer logs.")
-        }
-      }
-      val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
-        var statusList = resultMap.get(elem.getLoadName)
-        // check for the merged load folder.
-        if (statusList.isEmpty && null != elem.getMergedLoadName) {
-          statusList = resultMap.get(elem.getMergedLoadName)
-        }
-
-        if (statusList.isDefined) {
-          elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
-          // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
-          // use MARKED_FOR_UPDATE
-          if (statusList.get
-            .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
-          } else {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
-            updatedLoadMetadataDetailsList += elem
-          }
-          elem
-        } else {
-          elem
-        }
-      }
-
-      }
-
-      // Save the load metadata
-      val carbonLock = CarbonLockFactory
-        .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.METADATA_LOCK
-        )
-      try {
-        if (carbonLock.lockWithRetries()) {
-          LOGGER.info("Successfully got the table metadata file lock")
-          if (updatedLoadMetadataDetailsList.nonEmpty) {
-            // TODO: Load Aggregate tables after retention.
-          }
-
-          // write
-          CarbonLoaderUtil.writeLoadMetadata(
-            schema,
-            databaseName,
-            table.getDatabaseName,
-            updatedloadMetadataDetails.asJava
-          )
-        }
-      } finally {
-        if (carbonLock.unlock()) {
-          LOGGER.info("unlock the table metadata file successfully")
-        } else {
-          LOGGER.error("Unable to unlock the metadata lock")
-        }
-      }
-    } else {
-      LOGGER.error("Delete by Date request is failed")
-      LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
-      sys.error("Delete by Date request is failed, potential causes " +
-                "Empty store or Invalid column type, For more details please refer logs.")
-    }
-  }
-
   def alterTableForCompaction(sqlContext: SQLContext,
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
@@ -306,118 +208,6 @@ object CarbonDataRDDFactory {
     }
   }
 
-  def executeCompaction(carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      compactionModel: CompactionModel,
-      executor: ExecutorService,
-      sqlContext: SQLContext,
-      kettleHomePath: String,
-      storeLocation: String): Unit = {
-    val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
-      carbonLoadModel.getLoadMetadataDetails
-    )
-    CarbonDataMergerUtil.sortSegments(sortedSegments)
-
-    var segList = carbonLoadModel.getLoadMetadataDetails
-    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-      storePath,
-      carbonLoadModel,
-      compactionModel.compactionSize,
-      segList,
-      compactionModel.compactionType
-    )
-    while (loadsToMerge.size() > 1) {
-      val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
-      deletePartialLoadsInCompaction(carbonLoadModel)
-      val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-        CarbonCommonConstants
-          .DEFAULT_COLLECTION_SIZE
-      )
-
-      scanSegmentsAndSubmitJob(futureList,
-        loadsToMerge,
-        executor,
-        storePath,
-        sqlContext,
-        compactionModel,
-        kettleHomePath,
-        carbonLoadModel,
-        storeLocation
-      )
-
-      try {
-
-        futureList.asScala.foreach(future => {
-          future.get
-        }
-        )
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
-          throw e
-      }
-
-
-      // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
-      segList = carbonLoadModel.getLoadMetadataDetails
-      // in case of major compaction we will scan only once and come out as it will keep
-      // on doing major for the new loads also.
-      // excluding the newly added segments.
-      if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
-
-        segList = CarbonDataMergerUtil
-          .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
-      }
-      loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-        storePath,
-        carbonLoadModel,
-        compactionModel.compactionSize,
-        segList,
-        compactionModel.compactionType
-      )
-    }
-  }
-
-  /**
-   * This will submit the loads to be merged into the executor.
-   *
-   * @param futureList
-   */
-  def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
-      loadsToMerge: util
-      .List[LoadMetadataDetails],
-      executor: ExecutorService,
-      storePath: String,
-      sqlContext: SQLContext,
-      compactionModel: CompactionModel,
-      kettleHomePath: String,
-      carbonLoadModel: CarbonLoadModel,
-      storeLocation: String): Unit = {
-
-    loadsToMerge.asScala.foreach(seg => {
-      LOGGER.info("loads identified for merge is " + seg.getLoadName)
-    }
-    )
-
-    val compactionCallableModel = CompactionCallableModel(storePath,
-      carbonLoadModel,
-      storeLocation,
-      compactionModel.carbonTable,
-      kettleHomePath,
-      compactionModel.tableCreationTime,
-      loadsToMerge,
-      sqlContext,
-      compactionModel.compactionType
-    )
-
-    val future: Future[Void] = executor
-      .submit(new CompactionCallable(compactionCallableModel
-      )
-      )
-    futureList.add(future)
-  }
-
   def startCompactionThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
@@ -447,7 +237,7 @@ object CarbonDataRDDFactory {
           var triggeredCompactionStatus = false
           var exception: Exception = null
           try {
-            executeCompaction(carbonLoadModel: CarbonLoadModel,
+            DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
               storePath: String,
               compactionModel: CompactionModel,
               executor, sqlContext, kettleHomePath, storeLocation
@@ -479,7 +269,7 @@ object CarbonDataRDDFactory {
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = new CarbonLoadModel()
-              prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+              DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
               val tableCreationTime = CarbonEnv.get.carbonMetastore
                 .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
                   newCarbonLoadModel.getTableName
@@ -496,7 +286,7 @@ object CarbonDataRDDFactory {
               )
               // proceed for compaction
               try {
-                executeCompaction(newCarbonLoadModel,
+                DataManagementFunc.executeCompaction(newCarbonLoadModel,
                   newCarbonLoadModel.getStorePath,
                   newcompactionModel,
                   executor, sqlContext, kettleHomePath, storeLocation
@@ -534,7 +324,7 @@ object CarbonDataRDDFactory {
           }
         } finally {
           executor.shutdownNow()
-          deletePartialLoadsInCompaction(carbonLoadModel)
+          DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
           compactionLock.unlock()
         }
       }
@@ -544,41 +334,11 @@ object CarbonDataRDDFactory {
     compactionThread.run()
   }
 
-  def prepareCarbonLoadModel(storePath: String,
-      table: CarbonTable,
-      newCarbonLoadModel: CarbonLoadModel): Unit = {
-    newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
-    newCarbonLoadModel.setTableName(table.getFactTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
-    newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
-    newCarbonLoadModel.setStorePath(table.getStorePath)
-    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
-    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
-    newCarbonLoadModel.setFactTimeStamp(loadStartTime)
-  }
-
-  def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
-    // Deleting the any partially loaded data if present.
-    // in some case the segment folder which is present in store will not have entry in
-    // status.
-    // so deleting those folders.
-    try {
-      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
-    }
-  }
-
   def loadCarbonData(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
       kettleHomePath: String,
-      columinar: Boolean,
+      columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       useKettle: Boolean,
       dataFrame: Option[DataFrame] = None): Unit = {
@@ -673,7 +433,7 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
-      deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
+      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
         isForceDeletion = false)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -800,7 +560,6 @@ object CarbonDataRDDFactory {
           }
           val jobContext = new Job(hadoopConfiguration)
           val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val result = new Array[Partition](rawSplits.size)
           val blockList = rawSplits.map { inputSplit =>
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
@@ -855,7 +614,7 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             storePath,
             kettleHomePath,
-            columinar,
+            columnar,
             currentLoadCount,
             tableCreationTime,
             schemaLastUpdatedTime,
@@ -887,7 +646,7 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             storePath,
             kettleHomePath,
-            columinar,
+            columnar,
             currentLoadCount,
             tableCreationTime,
             schemaLastUpdatedTime,
@@ -1003,94 +762,4 @@ object CarbonDataRDDFactory {
 
   }
 
-  def deleteLoadsAndUpdateMetadata(
-      carbonLoadModel: CarbonLoadModel,
-      table: CarbonTable,
-      storePath: String,
-      isForceDeletion: Boolean): Unit = {
-    if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
-      val loadMetadataFilePath = CarbonLoaderUtil
-        .extractLoadMetadataFileLocation(carbonLoadModel)
-      val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
-      val carbonTableStatusLock = CarbonLockFactory
-        .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.TABLE_STATUS_LOCK)
-
-      // Delete marked loads
-      val isUpdationRequired = DeleteLoadFolders
-        .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
-
-      if (isUpdationRequired) {
-        try {
-          // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
-            LOGGER.info("Table status lock has been successfully acquired.")
-
-            // read latest table status again.
-            val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
-
-            // update the metadata details from old to new status.
-            val latestStatus = CarbonLoaderUtil
-                .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-            CarbonLoaderUtil.writeLoadMetadata(
-              carbonLoadModel.getCarbonDataLoadSchema,
-              carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, latestStatus)
-          } else {
-            val errorMsg = "Clean files request is failed for " +
-                           s"${ carbonLoadModel.getDatabaseName }." +
-                           s"${ carbonLoadModel.getTableName }" +
-                           ". Not able to acquire the table status lock due to other operation " +
-                           "running in the background."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg)
-            throw new Exception(errorMsg + " Please try after some time.")
-          }
-        } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
-        }
-      }
-    }
-  }
-
-  def dropTable(
-      sc: SparkContext,
-      schema: String,
-      table: String) {
-    val v: Value[Array[Object]] = new ValueImpl()
-    new CarbonDropTableRDD(sc, v, schema, table).collect
-  }
-
-  def cleanFiles(
-      sc: SparkContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String) {
-    val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
-    val carbonCleanFilesLock = CarbonLockFactory
-      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.CLEAN_FILES_LOCK
-      )
-    try {
-      if (carbonCleanFilesLock.lockWithRetries()) {
-        LOGGER.info("Clean files lock has been successfully acquired.")
-        deleteLoadsAndUpdateMetadata(carbonLoadModel,
-          table,
-          storePath,
-          isForceDeletion = true)
-      } else {
-        val errorMsg = "Clean files request is failed for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                       ". Not able to acquire the clean files lock due to another clean files " +
-                       "operation is running in the background."
-        LOGGER.audit(errorMsg)
-        LOGGER.error(errorMsg)
-        throw new Exception(errorMsg + " Please try after some time.")
-
-      }
-    } finally {
-      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
deleted file mode 100644
index cae99d1..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.Date
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
-import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.hive.DistributionUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.datastore.block.Distributable
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
-import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
-class CarbonSparkPartition(
-    val rddId: Int,
-    val idx: Int,
-    @transient val multiBlockSplit: CarbonMultiBlockSplit)
-    extends Partition {
-
-  val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
-
-  override val index: Int = idx
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
- * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
- * level filtering in driver side.
- */
-class CarbonScanRDD[V: ClassTag](
-    @transient sc: SparkContext,
-    columnProjection: Seq[Attribute],
-    filterExpression: Expression,
-    identifier: AbsoluteTableIdentifier,
-    @transient carbonTable: CarbonTable)
-    extends RDD[V](sc, Nil)
-        with CarbonHadoopMapReduceUtil
-        with Logging {
-
-  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-
-  @transient private val jobId = new JobID(jobTrackerId, id)
-  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
-  override def getPartitions: Array[Partition] = {
-    val job = Job.getInstance(new Configuration())
-    val format = prepareInputFormatForDriver(job.getConfiguration)
-
-    // initialise query_id for job
-    job.getConfiguration.set("query.id", queryId)
-
-    // get splits
-    val splits = format.getSplits(job)
-    val result = distributeSplits(splits)
-    result
-  }
-
-  private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
-    // this function distributes the split based on following logic:
-    // 1. based on data locality, to make split balanced on all available nodes
-    // 2. if the number of split for one
-
-    var statistic = new QueryStatistic()
-    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val parallelism = sparkContext.defaultParallelism
-    val result = new util.ArrayList[Partition](parallelism)
-    var noOfBlocks = 0
-    var noOfNodes = 0
-    var noOfTasks = 0
-
-    if (!splits.isEmpty) {
-      // create a list of block based on split
-      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
-
-      // get the list of executors and map blocks to executors based on locality
-      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
-
-      // divide the blocks among the tasks of the nodes as per the data locality
-      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
-        parallelism, activeNodes.toList.asJava)
-
-      statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
-      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
-      statistic = new QueryStatistic()
-
-      var i = 0
-      // Create Spark Partition for each task and assign blocks
-      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
-        blockList.asScala.foreach { blocksPerTask =>
-          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
-          if (blocksPerTask.size() != 0) {
-            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
-            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
-            result.add(partition)
-            i += 1
-          }
-        }
-      }
-
-      noOfBlocks = splits.size
-      noOfNodes = nodeBlockMapping.size
-      noOfTasks = result.size()
-
-      statistic = new QueryStatistic()
-      statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
-        System.currentTimeMillis)
-      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
-      statisticRecorder.logStatisticsAsTableDriver()
-    }
-    logInfo(
-      s"""
-         | Identified no.of.blocks: $noOfBlocks,
-         | no.of.tasks: $noOfTasks,
-         | no.of.nodes: $noOfNodes,
-         | parallelism: $parallelism
-       """.stripMargin)
-    result.toArray(new Array[Partition](result.size()))
-  }
-
-  override def compute(split: Partition, context: TaskContext): Iterator[V] = {
-    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-    if (null == carbonPropertiesFilePath) {
-      System.setProperty("carbon.properties.filepath",
-        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
-      )
-    }
-
-    val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-    val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
-    val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
-    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    val reader = format.createRecordReader(inputSplit, attemptContext)
-    reader.initialize(inputSplit, attemptContext)
-
-    val queryStartTime = System.currentTimeMillis
-
-    new Iterator[V] {
-      private var havePair = false
-      private var finished = false
-      private var count = 0
-
-      context.addTaskCompletionListener { context =>
-        logStatistics(queryStartTime, count)
-        reader.close()
-      }
-
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
-        }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            reader.close()
-          }
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val value: V = reader.getCurrentValue
-        count += 1
-        value
-      }
-    }
-  }
-
-  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
-    CarbonInputFormat.setCarbonTable(conf, carbonTable)
-    createInputFormat(conf)
-  }
-
-  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
-    CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
-    createInputFormat(conf)
-  }
-
-  private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
-    val format = new CarbonInputFormat[V]
-    CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
-    val projection = new CarbonProjection
-    columnProjection.foreach { attr =>
-      projection.addColumn(attr.name)
-    }
-    CarbonInputFormat.setColumnProjection(conf, projection)
-    format
-  }
-
-  def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
-    var queryStatistic = new QueryStatistic()
-    queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-      System.currentTimeMillis - queryStartTime)
-    val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
-    statisticRecorder.recordStatistics(queryStatistic)
-    // result size
-    queryStatistic = new QueryStatistic()
-    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
-    statisticRecorder.recordStatistics(queryStatistic)
-    // print executor query statistics for each task_id
-    statisticRecorder.logStatisticsAsTableExecutor()
-  }
-
-  /**
-   * Get the preferred locations where to launch this task.
-   */
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonSparkPartition]
-    val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
-    firstOptionLocation
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
deleted file mode 100644
index 44a2416..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.common.CarbonIterator
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
-import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-/**
- * It loads the data to carbon using @AbstractDataLoadProcessorStep
- */
-class NewCarbonDataLoadRDD[K, V](
-    sc: SparkContext,
-    result: DataLoadResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    loadCount: Integer,
-    blocksGroupBy: Array[(String, Array[BlockDetails])],
-    isTableSplitPartition: Boolean)
-  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-
-  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast =
-    sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
-
-  override def getPartitions: Array[Partition] = {
-    if (isTableSplitPartition) {
-      // for table split partition
-      var splits: Array[TableSplit] = null
-
-      if (carbonLoadModel.isDirectLoad) {
-        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-      } else {
-        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, null)
-      }
-
-      splits.zipWithIndex.map { s =>
-        // filter the same partition unique id, because only one will match, so get 0 element
-        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
-          p._1 == s._1.getPartition.getUniqueID)(0)._2
-        new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
-      }
-    } else {
-      // for node partition
-      blocksGroupBy.zipWithIndex.map { b =>
-        new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
-      }
-    }
-  }
-
-  override def checkpoint() {
-    // Do nothing. Hadoop RDD should not be checkpointed.
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[(K, V)] {
-      var partitionID = "0"
-      val loadMetadataDetails = new LoadMetadataDetails()
-      var model: CarbonLoadModel = _
-      var uniqueLoadStatusId =
-        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
-      try {
-        loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
-        val recordReaders = getInputIterators
-        val loader = new SparkPartitionLoader(model,
-          theSplit.index,
-          null,
-          null,
-          loadCount,
-          loadMetadataDetails)
-        // Intialize to set carbon properties
-        loader.initialize()
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-        new DataLoadExecutor().execute(model,
-          loader.storeLocation,
-          recordReaders)
-      } catch {
-        case e: BadRecordFoundException =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-          logInfo("Bad Record Found")
-        case e: Exception =>
-          logInfo("DataLoad failure", e)
-          LOGGER.error(e)
-          throw e
-      }
-
-      def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
-        val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0)
-        var configuration: Configuration = confBroadcast.value.value
-        // Broadcast fails in some cases WTF??
-        if (configuration == null) {
-          configuration = new Configuration()
-        }
-        configureCSVInputFormat(configuration)
-        val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
-        val format = new CSVInputFormat
-        if (isTableSplitPartition) {
-          // for table split partition
-          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
-          logInfo("Input split: " + split.serializableHadoopSplit.value)
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          if (carbonLoadModel.isDirectLoad) {
-            model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID,
-                split.serializableHadoopSplit.value.getPartition.getFilesPath,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID)
-          }
-          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
-          StandardLogService.setThreadName(partitionID, null)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
-              partitionID, split.partitionBlocksDetail.length)
-          val readers =
-          split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.map { case (reader, index) =>
-            new RecordReaderIterator(reader,
-              split.partitionBlocksDetail(index),
-              hadoopAttemptContext)
-          }
-        } else {
-          // for node partition
-          val split = theSplit.asInstanceOf[CarbonNodePartition]
-          logInfo("Input split: " + split.serializableHadoopSplit)
-          logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
-              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
-          val blocksID = gernerateBlocksID
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          if (carbonLoadModel.isDirectLoad) {
-            val filelist: java.util.List[String] = new java.util.ArrayList[String](
-                CarbonCommonConstants.CONSTANT_SIZE_TEN)
-            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(partitionID)
-          }
-          StandardLogService.setThreadName(blocksID, null)
-          val readers =
-            split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.map { case (reader, index) =>
-            new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
-          }
-        }
-      }
-
-      def configureCSVInputFormat(configuration: Configuration): Unit = {
-        CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
-        CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
-        CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
-        CSVInputFormat.setHeaderExtractionEnabled(configuration,
-          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
-        CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
-        CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
-          .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
-            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
-      }
-
-      /**
-       * generate blocks id
-       *
-       * @return
-       */
-      def gernerateBlocksID: String = {
-        if (isTableSplitPartition) {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
-            .getPartition.getUniqueID + "_" + UUID.randomUUID()
-        } else {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          UUID.randomUUID()
-        }
-      }
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
-      }
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
-        location
-      case false =>
-        // for node partition
-        val theSplit = split.asInstanceOf[CarbonNodePartition]
-        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
-        logInfo("Preferred Location for split : " + firstOptionLocation.head)
-        val blockMap = new util.LinkedHashMap[String, Integer]()
-        val tableBlocks = theSplit.blocksDetails
-        tableBlocks.foreach { tableBlock =>
-          tableBlock.getLocations.foreach { location =>
-            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-              val currentCount = blockMap.get(location)
-              if (currentCount == null) {
-                blockMap.put(location, 1)
-              } else {
-                blockMap.put(location, currentCount + 1)
-              }
-            }
-          }
-        }
-
-        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
-          nodeCount1.getValue > nodeCount2.getValue
-        }
-
-        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-        firstOptionLocation ++ sortedNodesList
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
deleted file mode 100644
index 84f398a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapred
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * It is just dummy class to use Sparks package restricted SparkHadoopMapReduceUtil.
- */
-trait CarbonHadoopMapReduceUtil extends SparkHadoopMapReduceUtil {
-
-}
-
-class CarbonSerializableConfiguration(@transient var conf: Configuration)
-  extends SerializableConfiguration(conf)