You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:39 UTC

[32/50] [abbrv] carbondata git commit: [CARBONDATA-2093] Use small file feature of global sort to minimise the carbondata file count

[CARBONDATA-2093] Use small file feature of global sort to minimise the carbondata file count

This closes #1876


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e527c059
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e527c059
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e527c059

Branch: refs/heads/branch-1.3
Commit: e527c059e81e58503d568c82a3e7ac822a8a5b47
Parents: 8875775
Author: ravipesala <ra...@gmail.com>
Authored: Sun Jan 28 20:37:21 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Sat Feb 3 16:36:30 2018 +0800

----------------------------------------------------------------------
 .../StandardPartitionTableLoadingTestCase.scala |  77 ++++++++++-
 .../load/DataLoadProcessBuilderOnSpark.scala    | 130 +------------------
 .../carbondata/spark/util/DataLoadingUtil.scala | 127 ++++++++++++++++++
 .../management/CarbonLoadDataCommand.scala      |  94 ++++++--------
 .../sort/sortdata/SortParameters.java           |   4 +
 5 files changed, 249 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 16f252b..669d6e7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -16,11 +16,12 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
-import java.io.{File, IOException}
+import java.io.{File, FileWriter, IOException}
 import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
@@ -30,7 +31,8 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
   var executorService: ExecutorService = _
@@ -409,6 +411,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       sql("select * from  casesensitivepartition where empno=17"))
   }
 
+  test("Partition LOAD with small files") {
+    sql("DROP TABLE IF EXISTS smallpartitionfiles")
+    sql(
+      """
+        | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val inputPath = new File("target/small_files").getCanonicalPath
+    val folder = new File(inputPath)
+    if (folder.exists()) {
+      FileUtils.deleteDirectory(folder)
+    }
+    folder.mkdir()
+    for (i <- 0 to 100) {
+      val file = s"$folder/file$i.csv"
+      val writer = new FileWriter(file)
+      writer.write("id,name,city,age\n")
+      writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
+      writer.close()
+    }
+    sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
+    FileUtils.deleteDirectory(folder)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+    assert(new File(segmentDir).listFiles().length < 50)
+  }
+
+  test("verify partition read with small files") {
+    try {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
+      sql("DROP TABLE IF EXISTS smallpartitionfilesread")
+      sql(
+        """
+          | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY
+          | (city STRING)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      val inputPath = new File("target/small_files").getCanonicalPath
+      val folder = new File(inputPath)
+      if (folder.exists()) {
+        FileUtils.deleteDirectory(folder)
+      }
+      folder.mkdir()
+      for (i <- 0 until 100) {
+        val file = s"$folder/file$i.csv"
+        val writer = new FileWriter(file)
+        writer.write("id,name,city,age\n")
+        writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
+        writer.close()
+      }
+      sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread")
+      FileUtils.deleteDirectory(folder)
+      val dataFrame = sql("select * from smallpartitionfilesread")
+      val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
+        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
+          .asInstanceOf[CarbonScanRDD]
+      }.head
+      assert(scanRdd.getPartitions.length < 10)
+      assertResult(100)(dataFrame.count)
+    } finally {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+    }
+  }
+
+
+
   def restoreData(dblocation: String, tableName: String) = {
     val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
     val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
@@ -435,6 +506,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
 
 
   override def afterAll = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+      CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
     dropTable
     if (executorService != null && !executorService.isShutdown) {
       executorService.shutdownNow()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 781b484..8be70a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -17,26 +17,12 @@
 
 package org.apache.carbondata.spark.load
 
-import java.text.SimpleDateFormat
-import java.util.{Comparator, Date, Locale}
-
-import scala.collection.mutable.ArrayBuffer
+import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 import org.apache.spark.TaskContext
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -45,12 +31,10 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -68,7 +52,7 @@ object DataLoadProcessBuilderOnSpark {
     } else {
       // input data from files
       val columnCount = model.getCsvHeaderColumns.length
-      csvFileScanRDD(sparkSession, model, hadoopConf)
+      DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 
@@ -166,112 +150,4 @@ object DataLoadProcessBuilderOnSpark {
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     }
   }
-
-  /**
-   * creates a RDD that does reading of multiple CSV files
-   */
-  def csvFileScanRDD(
-      spark: SparkSession,
-      model: CarbonLoadModel,
-      hadoopConf: Configuration
-  ): RDD[InternalRow] = {
-    // 1. partition
-    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
-    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
-    val defaultParallelism = spark.sparkContext.defaultParallelism
-    CommonUtil.configureCSVInputFormat(hadoopConf, model)
-    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
-    val jobConf = new JobConf(hadoopConf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    val jobContext = new JobContextImpl(jobConf, null)
-    val inputFormat = new CSVInputFormat()
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val splitFiles = rawSplits.map { split =>
-      val fileSplit = split.asInstanceOf[FileSplit]
-      PartitionedFile(
-        InternalRow.empty,
-        fileSplit.getPath.toString,
-        fileSplit.getStart,
-        fileSplit.getLength,
-        fileSplit.getLocations)
-    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
-    val bytesPerCore = totalBytes / defaultParallelism
-
-    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
-    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
-                s"open cost is considered as scanning $openCostInBytes bytes.")
-
-    val partitions = new ArrayBuffer[FilePartition]
-    val currentFiles = new ArrayBuffer[PartitionedFile]
-    var currentSize = 0L
-
-    def closePartition(): Unit = {
-      if (currentFiles.nonEmpty) {
-        val newPartition =
-          FilePartition(
-            partitions.size,
-            currentFiles.toArray.toSeq)
-        partitions += newPartition
-      }
-      currentFiles.clear()
-      currentSize = 0
-    }
-
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
-      }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles += file
-    }
-    closePartition()
-
-    // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(jobConf)
-    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
-      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
-        new Iterator[InternalRow] {
-          val hadoopConf = serializableConfiguration.value
-          val jobTrackerId: String = {
-            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
-            formatter.format(new Date())
-          }
-          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
-          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
-          val inputSplit =
-            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
-          var finished = false
-          val inputFormat = new CSVInputFormat()
-          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
-          reader.initialize(inputSplit, hadoopAttemptContext)
-
-          override def hasNext: Boolean = {
-            if (!finished) {
-              if (reader != null) {
-                if (reader.nextKeyValue()) {
-                  true
-                } else {
-                  finished = true
-                  reader.close()
-                  false
-                }
-              } else {
-                finished = true
-                false
-              }
-            } else {
-              false
-            }
-          }
-
-          override def next(): InternalRow = {
-            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
-          }
-        }
-      }
-    }
-    new FileScanRDD(spark, readFunction, partitions)
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 5e9f7fe..8b4c232 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -17,12 +17,28 @@
 
 package org.apache.carbondata.spark.util
 
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
 import scala.collection.{immutable, mutable}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -32,10 +48,13 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
 import org.apache.carbondata.spark.load.ValidateUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * the util object of data loading
@@ -403,4 +422,112 @@ object DataLoadingUtil {
     }
   }
 
+  /**
+   * creates a RDD that does reading of multiple CSV files
+   */
+  def csvFileScanRDD(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    // 1. partition
+    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+    val defaultParallelism = spark.sparkContext.defaultParallelism
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val jobContext = new JobContextImpl(jobConf, null)
+    val inputFormat = new CSVInputFormat()
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val splitFiles = rawSplits.map { split =>
+      val fileSplit = split.asInstanceOf[FileSplit]
+      PartitionedFile(
+        InternalRow.empty,
+        fileSplit.getPath.toString,
+        fileSplit.getStart,
+        fileSplit.getLength,
+        fileSplit.getLocations)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+                s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+
+    // 2. read function
+    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        new Iterator[InternalRow] {
+          val hadoopConf = serializableConfiguration.value
+          val jobTrackerId: String = {
+            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+            formatter.format(new Date())
+          }
+          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val inputSplit =
+            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+          var finished = false
+          val inputFormat = new CSVInputFormat()
+          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+          reader.initialize(inputSplit, hadoopAttemptContext)
+
+          override def hasNext: Boolean = {
+            if (!finished) {
+              if (reader != null) {
+                if (reader.nextKeyValue()) {
+                  true
+                } else {
+                  finished = true
+                  reader.close()
+                  false
+                }
+              } else {
+                finished = true
+                false
+              }
+            } else {
+              false
+            }
+          }
+
+          override def next(): InternalRow = {
+            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+          }
+        }
+      }
+    }
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 8e6c20e..7d49c11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -26,22 +26,18 @@ import scala.collection.mutable
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{NewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -60,22 +56,21 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath}
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.exception.{NoRetryException}
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -95,6 +90,10 @@ case class CarbonLoadDataCommand(
 
   var table: CarbonTable = _
 
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -113,6 +112,15 @@ case class CarbonLoadDataCommand(
         }
         relation.carbonTable
       }
+    if (table.isHivePartitionTable) {
+      logicalPartitionRelation =
+        new FindDataSourceTable(sparkSession).apply(
+          sparkSession.sessionState.catalog.lookupRelation(
+            TableIdentifier(tableName, databaseNameOp))).collect {
+          case l: LogicalRelation => l
+        }.head
+      sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
+    }
     operationContext.setProperty("isOverwrite", isOverwriteTable)
     if(CarbonUtil.hasAggregationDataMap(table)) {
       val loadMetadataEvent = new LoadMetadataEvent(table, false)
@@ -500,20 +508,7 @@ case class CarbonLoadDataCommand(
       operationContext: OperationContext) = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
-    val logicalPlan =
-      sparkSession.sessionState.catalog.lookupRelation(
-        identifier)
-    val catalogTable: CatalogTable = logicalPlan.collect {
-      case l: LogicalRelation => l.catalogTable.get
-      case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
-        if c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
-            c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
-            c.getClass.getName.equals(
-              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
-        CarbonReflectionUtils.getFieldOfCatalogTable(
-          "tableMeta",
-          c).asInstanceOf[CatalogTable]
-    }.head
+    val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
     val currentPartitions =
       CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
     // Clean up the alreday dropped partitioned data
@@ -581,10 +576,6 @@ case class CarbonLoadDataCommand(
 
       } else {
         // input data from csv files. Convert to logical plan
-        CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
-        hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
-        val jobConf = new JobConf(hadoopConf)
-        SparkHadoopUtil.get.addCredentials(jobConf)
         val attributes =
           StructType(carbonLoadModel.getCsvHeaderColumns.map(
             StructField(_, StringType))).toAttributes
@@ -603,28 +594,27 @@ case class CarbonLoadDataCommand(
         }
         val len = rowDataTypes.length
         var rdd =
-          new NewHadoopRDD[NullWritable, StringArrayWritable](
-            sparkSession.sparkContext,
-            classOf[CSVInputFormat],
-            classOf[NullWritable],
-            classOf[StringArrayWritable],
-            jobConf).map { case (key, value) =>
-            val data = new Array[Any](len)
-            var i = 0
-            val input = value.get()
-            val inputLen = Math.min(input.length, len)
-            while (i < inputLen) {
-              data(i) = UTF8String.fromString(input(i))
-              // If partition column then update empty value with special string otherwise spark
-              // makes it as null so we cannot internally handle badrecords.
-              if (partitionColumns(i)) {
-                if (input(i) != null && input(i).isEmpty) {
-                  data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+          DataLoadingUtil.csvFileScanRDD(
+            sparkSession,
+            model = carbonLoadModel,
+            hadoopConf)
+            .map { row =>
+              val data = new Array[Any](len)
+              var i = 0
+              val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]
+              val inputLen = Math.min(input.length, len)
+              while (i < inputLen) {
+                data(i) = UTF8String.fromString(input(i))
+                // If partition column then update empty value with special string otherwise spark
+                // makes it as null so we cannot internally handle badrecords.
+                if (partitionColumns(i)) {
+                  if (input(i) != null && input(i).isEmpty) {
+                    data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+                  }
                 }
+                i = i + 1
               }
-              i = i + 1
-            }
-            InternalRow.fromSeq(data)
+              InternalRow.fromSeq(data)
 
           }
         // Only select the required columns
@@ -638,10 +628,6 @@ case class CarbonLoadDataCommand(
         }
         Project(output, LogicalRDD(attributes, rdd)(sparkSession))
       }
-      // TODO need to find a way to avoid double lookup
-      val sizeInBytes =
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-          catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
       val convertRelation = convertToLogicalRelation(
         catalogTable,
         sizeInBytes,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index a2248ee..98d150e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -403,6 +403,10 @@ public class SortParameters implements Serializable {
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
 
     int numberOfCores = carbonProperties.getNumberOfCores() / 2;
+    // In case of loading from partition we should use the cores specified by it
+    if (configuration.getWritingCoresCount() > 0) {
+      numberOfCores = configuration.getWritingCoresCount();
+    }
     parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties