You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2021/05/25 06:59:59 UTC

[carbondata] branch master updated: [CARBONDATA-4183] Local sort Partition Load and Compaction fix

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a90243c  [CARBONDATA-4183] Local sort Partition Load and Compaction fix
a90243c is described below

commit a90243cc5b6a2748b3078cc2d80d8addb4887b75
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Apr 2 20:17:55 2021 +0530

    [CARBONDATA-4183] Local sort Partition Load and Compaction fix
    
    Why is this PR needed?
    Currently, number of tasks for partition table local sort load, is decided based on input file size. In this case, the data will not be properly sorted, as tasks launched is more. For compaction, number of tasks is equal to number of partitions. If data is huge for a partition, then there can be chances, that compaction will fail with OOM with less memory configurations.
    
    What changes were proposed in this PR?
    When local sort task level property is enabled,
    
    For local sort load, divide input files based on the node locality (num of task = num of nodes), which will properly do the local sorting.
    For compaction, launch task based on task id for a partition, so the task launched for a partition will be more.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4130
---
 .../core/constants/CarbonCommonConstants.java      | 12 ++++
 docs/configuration-parameters.md                   |  1 +
 .../carbondata/spark/load/CsvRDDHelper.scala       | 57 ++++++++++++++--
 .../spark/rdd/CarbonDataRDDFactory.scala           | 52 ++++++++-------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 76 ++++++++++++++++++++--
 .../spark/rdd/CarbonTableCompactor.scala           |  2 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala    |  9 +++
 .../command/management/CommonLoadUtils.scala       | 21 ++++--
 .../StandardPartitionTableCompactionTestCase.scala | 58 ++++++++++++++++-
 9 files changed, 250 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7598dea9..a187f19 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -975,6 +975,18 @@ public final class CarbonCommonConstants {
   public static final String LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT = "0";
 
   /**
+   * When enabled, tasks launched for Local sort partition load will be based on one node one task.
+   * Compaction will be performed based on task level for a partition. Load performance might be
+   * degraded, because, the number of tasks launched is equal to num of nodes in case of local sort.
+   * Improves memory consumption for load and compaction.
+   */
+  @CarbonProperty
+  public static final String CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL =
+      "carbon.partition.data.on.tasklevel";
+
+  public static final String CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT = "false";
+
+  /**
    * whether to prefetch data while loading.
    */
   @CarbonProperty
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index f65d36f..f50728a 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -117,6 +117,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.compaction.prefetch.enable | false | Compaction operation is similar to Query + data load where in data from qualifying segments are queried and data loading performed to generate a new single segment. This configuration determines whether to query ahead data from segments and feed it for data loading. **NOTE:** This configuration is disabled by default as it needs extra resources for querying extra data. Based on the memory availability on the cluster, user can enable it to imp [...]
 | carbon.enable.range.compaction | true | To configure Ranges-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
 | carbon.si.segment.merge | false | Making this true degrade the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user an either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
+| carbon.partition.data.on.tasklevel | false | When enabled, tasks launched for Local sort partition load will be based on one node one task. Compaction will be performed based on task level for a partition. Load performance might be degraded, because, the number of tasks launched is equal to number of nodes in case of local sort. For compaction, memory consumption will be less, as more number of tasks will be launched for a partition |
 
 ## Query Configuration
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 326f9b1..9558d12 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.load
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -29,16 +30,22 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.{getNodeBlockMapping, LOGGER}
 import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
 
 object CsvRDDHelper {
@@ -108,8 +115,49 @@ object CsvRDDHelper {
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf)
-    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+    val readFunction = getReadFunction(hadoopConf)
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
+  /**
+   * create a RDD that does reading of multiple CSV files based on data locality
+   */
+  def csvFileScanRDDForLocalSort(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    // divide the blocks among the nodes as per the data locality
+    val nodeBlockMapping = getNodeBlockMapping(spark.sqlContext, hadoopConf, model)
+    val partitions = new ArrayBuffer[FilePartition]
+    // create file partition
+    nodeBlockMapping.map { entry =>
+      val files = entry._2.asScala.map(distributable => {
+        val tableBlock = distributable.asInstanceOf[TableBlockInfo]
+        PartitionedFile(
+          InternalRow.empty,
+          tableBlock.getFilePath,
+          tableBlock.getBlockOffset,
+          tableBlock.getBlockLength,
+          tableBlock.getLocations)
+      }).toArray
+      val newPartition =
+        CarbonToSparkAdapter.createFilePartition(
+          partitions.size,
+          collection.mutable.ArrayBuffer(files: _*))
+      partitions += newPartition
+    }
+
+    // 2. read function
+    val readFunction = getReadFunction(hadoopConf)
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
+  private def getReadFunction(configuration: Configuration): (PartitionedFile =>
+    Iterator[InternalRow]) = {
+    val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(configuration)
+    new (PartitionedFile => Iterator[InternalRow]) with Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {
           ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value)
@@ -149,7 +197,6 @@ object CsvRDDHelper {
         }
       }
     }
-    new FileScanRDD(spark, readFunction, partitions)
   }
 
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9405f3a..a6b8832 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -848,6 +848,31 @@ object CarbonDataRDDFactory {
       hadoopConf: Configuration,
       segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    val nodeBlockMapping = getNodeBlockMapping(sqlContext, hadoopConf, carbonLoadModel)
+    val blocksGroupBy: Array[(String, Array[BlockDetails])] = nodeBlockMapping.map { entry =>
+      val blockDetailsList =
+        entry._2.asScala.map(distributable => {
+          val tableBlock = distributable.asInstanceOf[TableBlockInfo]
+          new BlockDetails(new Path(tableBlock.getFilePath),
+            tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
+          )
+        }).toArray
+      (entry._1, blockDetailsList)
+    }.toArray
+
+    new NewCarbonDataLoadRDD(
+      sqlContext.sparkSession,
+      new DataLoadResultImpl(),
+      carbonLoadModel,
+      blocksGroupBy,
+      segmentMetaDataAccumulator
+    ).collect()
+  }
+
+  def getNodeBlockMapping(
+      sqlContext: SQLContext,
+      hadoopConf: Configuration,
+      carbonLoadModel: CarbonLoadModel): Seq[(String, util.List[Distributable])] = {
     /*
      * when data load handle by node partition
      * 1)clone the hadoop configuration,and set the file path to the configuration
@@ -881,10 +906,10 @@ object CarbonDataRDDFactory {
     val activeNodes = DistributionUtil
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
     val skewedDataOptimization = CarbonProperties.getInstance()
-      .isLoadSkewedDataOptimizationEnabled()
+      .isLoadSkewedDataOptimizationEnabled
     // get user ddl input the node loads the smallest amount of data
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    var loadMinSize = carbonLoadModel.getLoadMinSize()
+    var loadMinSize = carbonLoadModel.getLoadMinSize
     if (loadMinSize.equalsIgnoreCase(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) {
       loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
         .getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
@@ -906,13 +931,13 @@ object CarbonDataRDDFactory {
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-                s"No.of Nodes: ${nodeBlockMapping.size}")
+                s"No.of Nodes: ${ nodeBlockMapping.size }")
     var str = ""
     nodeBlockMapping.foreach { entry =>
       val tableBlock = entry._2
       val totalSize = tableBlock.asScala.map(_.asInstanceOf[TableBlockInfo].getBlockLength).sum
       str = str + "#Node: " + entry._1 + ", no.of.blocks: " + tableBlock.size() +
-            f", totalsize.of.blocks: ${totalSize * 0.1 * 10 / 1024 /1024}%.2fMB"
+            f", totalsize.of.blocks: ${ totalSize * 0.1 * 10 / 1024 / 1024 }%.2fMB"
       tableBlock.asScala.foreach(tableBlockInfo =>
         if (!tableBlockInfo.getLocations.exists(hostEntry =>
           hostEntry.equalsIgnoreCase(entry._1)
@@ -924,24 +949,7 @@ object CarbonDataRDDFactory {
       str = str + "\n"
     }
     LOGGER.info(str)
-    val blocksGroupBy: Array[(String, Array[BlockDetails])] = nodeBlockMapping.map { entry =>
-      val blockDetailsList =
-        entry._2.asScala.map(distributable => {
-          val tableBlock = distributable.asInstanceOf[TableBlockInfo]
-          new BlockDetails(new Path(tableBlock.getFilePath),
-            tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
-          )
-        }).toArray
-      (entry._1, blockDetailsList)
-    }.toArray
-
-    new NewCarbonDataLoadRDD(
-      sqlContext.sparkSession,
-      new DataLoadResultImpl(),
-      carbonLoadModel,
-      blocksGroupBy,
-      segmentMetaDataAccumulator
-    ).collect()
+    nodeBlockMapping
   }
 
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index a6bd1bd..27a13da 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -359,8 +359,12 @@ class CarbonMergerRDD[K, V](
         .map(_.asInstanceOf[CarbonInputSplit])
         .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
     }
+    var hasExternalSegment = false
     val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { inputSplit =>
       val segmentId = Segment.toSegment(inputSplit.getSegmentId).getSegmentNo
+      if (inputSplit.getSegment.isExternalSegment) {
+        hasExternalSegment = true
+      }
       val blockInfo = new TableBlockInfo(inputSplit.getFilePath,
         inputSplit.getStart, inputSplit.getSegmentId,
         inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion,
@@ -434,6 +438,12 @@ class CarbonMergerRDD[K, V](
     }
 
     val partitionTaskMap = new util.HashMap[PartitionSpec, String]()
+    val partitionTaskMapBasedOnId = new util.HashMap[PartitionSpec, util.HashMap[String, Int]]()
+    val isPartitionBasedOnTask = CarbonProperties
+      .getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+        CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT)
+      .toBoolean
     val counter = new AtomicInteger()
     var indexOfRangeColumn = -1
     // As we are already handling null values in the filter expression separately so we
@@ -472,7 +482,11 @@ class CarbonMergerRDD[K, V](
       carbonInputSplits.foreach { split =>
         var dataFileFooter: DataFileFooter = null
         if (null == rangeColumn) {
-          val taskNo = getTaskNo(split, partitionTaskMap, counter)
+          val taskNo = if (isPartitionBasedOnTask && !hasExternalSegment) {
+            getTaskNumberBasedOnID(split, partitionTaskMapBasedOnId, counter)
+          } else {
+            getTaskNo(split, partitionTaskMap, counter)
+          }
           val splitList = taskIdMapping.get(taskNo)
           noOfBlocks += 1
           if (null == splitList) {
@@ -590,12 +604,15 @@ class CarbonMergerRDD[K, V](
                 splitListForRange,
                 Array(nodeName))
             }
+            val partitionSpec = getPartitionNamesFromTask(taskInfo.getTaskId,
+              partitionTaskMap,
+              partitionTaskMapBasedOnId)
             result.add(
               new CarbonSparkPartition(
                 id,
                 taskPartitionNo,
                 multiBlockSplit,
-                getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap)))
+                partitionSpec))
             taskPartitionNo += 1
           }
         }
@@ -700,7 +717,7 @@ class CarbonMergerRDD[K, V](
         case Some(part) => part
         case None =>
           throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
-      }
+        }
       var task = partitionTaskMap.get(partTask)
       if (task == null) {
         task = counter.incrementAndGet().toString
@@ -712,10 +729,59 @@ class CarbonMergerRDD[K, V](
     }
   }
 
+  /**
+   * Group the data files based on task Id per partition
+   */
+  private def getTaskNumberBasedOnID(
+      split: CarbonInputSplit,
+      partitionTaskMap: util.Map[PartitionSpec, util.HashMap[String, Int]],
+      counter: AtomicInteger): String = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      val path = split.getPath.getParent
+      val partTask =
+        carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match {
+          case Some(part) => part
+          case None =>
+            throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
+        }
+      var task: String = null
+      val partitionTaskId = CarbonScalaUtil.getTaskIdFromUniqueNumber(split.taskId)
+      if (partitionTaskMap.isEmpty || partitionTaskMap.get(partTask) == null) {
+        if (partitionTaskMap.isEmpty) {
+          task = counter.toString
+        } else {
+          task = counter.incrementAndGet().toString
+        }
+        val partitionTaskList = new util.HashMap[String, Int]()
+        partitionTaskList.put(task, partitionTaskId)
+        partitionTaskMap.put(partTask, partitionTaskList)
+      } else {
+        val taskMap = partitionTaskMap.get(partTask)
+        if (taskMap.containsValue(partitionTaskId)) {
+          task = taskMap.asScala.find(_._2.equals(partitionTaskId)).get._1
+        } else {
+          task = counter.incrementAndGet().toString
+          taskMap.put(task, partitionTaskId)
+        }
+        partitionTaskMap.put(partTask, taskMap)
+      }
+      task
+    } else {
+      split.taskId
+    }
+  }
+
   private def getPartitionNamesFromTask(taskId: String,
-      partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = {
+      partitionTaskMap: util.Map[PartitionSpec, String],
+      partitionTaskMapBasedOnId: util.Map[PartitionSpec, util.HashMap[String, Int]])
+  : Option[PartitionSpec] = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
-      Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1)
+      if (!partitionTaskMapBasedOnId.isEmpty) {
+        Some(partitionTaskMapBasedOnId.asScala
+          .find(f => f._2.asScala.exists(_._1.equals(taskId))).get._1)
+      } else {
+        Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1)
+      }
     } else {
       None
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 642387c..bf3eed3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -267,7 +267,7 @@ class CarbonTableCompactor(
       )
       val partitionSpecs = mergeRDD.getPartitions.map { partition =>
         partition.asInstanceOf[CarbonSparkPartition].partitionSpec.get
-      }
+      }.distinct
       if (partitionSpecs != null && partitionSpecs.nonEmpty) {
         compactionCallableModel.compactedPartitions = Some(partitionSpecs)
       }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3e1374e..7cb977c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -483,6 +483,15 @@ object CarbonScalaUtil {
   }
 
   /**
+   * Get the task id from unique number
+   */
+  def getTaskIdFromUniqueNumber(uniqueNumber: String): Int = {
+    // Unique number is a combination of segment id(3 digits), task id (6 digits) and
+    // partition number (6 digits).
+    uniqueNumber.substring(3, 9).toInt - Math.pow(10, 5).toInt
+  }
+
+  /**
    * Use reflection to clean the parser objects which are set in thread local to avoid memory issue
    */
   def cleanParserThreadLocals(): Unit = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 3c2e81e..695681a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -996,10 +996,23 @@ object CommonLoadUtils {
         }
       } else {
         val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
-        val rdd = CsvRDDHelper.csvFileScanRDD(
-          loadParams.sparkSession,
-          model = loadParams.carbonLoadModel,
-          loadParams.hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+        val partitionBasedOnLocality = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+            CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
+        val rdd =
+          if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
+            CsvRDDHelper.csvFileScanRDDForLocalSort(
+              loadParams.sparkSession,
+              model = loadParams.carbonLoadModel,
+              loadParams.hadoopConf)
+              .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+          } else {
+            CsvRDDHelper.csvFileScanRDD(
+              loadParams.sparkSession,
+              model = loadParams.carbonLoadModel,
+              loadParams.hadoopConf)
+              .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+          }
         val (transformedPlan, partitions, persistedRDDLocal) =
           transformQueryWithRow(
             rdd.asInstanceOf[RDD[Row]],
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 9d294a5..2d00c69 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -17,12 +17,16 @@
 
 package org.apache.carbondata.spark.testsuite.standardpartition
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndAfterAll {
   // scalastyle:off lineLength
@@ -94,6 +98,58 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"))
   }
 
+  test("data load and compaction for local sort partition table based on task id ") {
+    sql("drop table if exists partitionthree")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL, "TRUE")
+    try {
+      sql("""
+            | CREATE TABLE partitionthree (empno int, doj Timestamp,
+            |  workgroupcategoryname String, deptno int, deptname String,
+            |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+            |  utilization int,salary int)
+            | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+            | STORED AS carbondata
+            | tblproperties('sort_scope'='local_sort', 'sort_columns'='deptname,empname')
+      """.stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+      assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+
+      sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
+      sql("clean files for table partitionthree options('force'='true')")
+      assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+      checkExistence(sql("show segments for table partitionthree"), true, "0.1")
+      checkAnswer(sql(
+        "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization," +
+        " salary from partitionthree where workgroupcategory=1 and empname='arvind' and " +
+        "designation='SE' order by empno"),
+        sql(
+          "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+          "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, " +
+          "utilization, salary from originTable where workgroupcategory=1 and empname='arvind' " +
+          "and designation='SE' order by empno"))
+    } finally {
+      sql("drop table if exists partitionthree")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+          CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT)
+    }
+  }
+
+  private def getPartitionDataFilesCount(tableName: String, partition: String) = {
+    val table: CarbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+    FileFactory.getCarbonFile(table.getTablePath + partition)
+      .listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }
+      }).length
+  }
+
   test("data major compaction for partition table") {
     sql(
       """