You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:14 UTC

[18/50] [abbrv] carbondata git commit: [CARBONDATA-1880] Combine input small files for GLOBAL_SORT

[CARBONDATA-1880] Combine input small files for GLOBAL_SORT

Combine input small files for GLOBAL_SORT to avoid carbon small file issue

This closes #1669


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/694ee774
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/694ee774
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/694ee774

Branch: refs/heads/fgdatamap
Commit: 694ee774cb8ca55ee24cd906368c3cf9cc96b0eb
Parents: 28c9418
Author: QiangCai <qi...@qq.com>
Authored: Fri Dec 15 22:02:28 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 20 14:58:37 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../carbondata/core/util/CarbonProperties.java  |  14 ++
 .../hadoop/CarbonMultiBlockSplit.java           |  17 ++-
 ...ompactionSupportGlobalSortFunctionTest.scala |   4 +-
 ...mpactionSupportGlobalSortParameterTest.scala |   5 -
 .../MajorCompactionIgnoreInMinorTest.scala      |  16 +-
 .../dataload/TestGlobalSortDataLoad.scala       |  33 ++++-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   1 +
 .../load/DataLoadProcessBuilderOnSpark.scala    | 148 ++++++++++++++++---
 .../load/DataLoadProcessorStepOnSpark.scala     |   7 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 121 +++++++++++----
 .../apache/spark/sql/util/CarbonException.scala |   8 +-
 .../apache/spark/sql/util/SparkSQLUtil.scala    |  25 ++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 15 files changed, 333 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f67b0c5..9534099 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1277,6 +1277,10 @@ public final class CarbonCommonConstants {
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
 
+  @CarbonProperty
+  public static final String CARBON_COMBINE_SMALL_INPUT_FILES = "carbon.mergeSmallFileRead.enable";
+  public static final String CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT = "false";
+
   public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
   @CarbonProperty
   public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index fe396cb..11aea99 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -106,6 +106,7 @@ public final class CarbonProperties {
     validateLockType();
     validateCarbonCSVReadBufferSizeByte();
     validateHandoffSize();
+    validateCombineSmallInputFiles();
   }
 
   private void validateCarbonCSVReadBufferSizeByte() {
@@ -205,6 +206,19 @@ public final class CarbonProperties {
     }
   }
 
+  private void validateCombineSmallInputFiles() {
+    String combineSmallInputFilesStr =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES);
+    boolean isValidBooleanValue = CarbonUtil.validateBoolean(combineSmallInputFilesStr);
+    if (!isValidBooleanValue) {
+      LOGGER.warn("The combine small files value \"" + combineSmallInputFilesStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES,
+          CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
+    }
+  }
+
   private void validateEnableUnsafeSort() {
     String unSafeSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
     boolean isValidBooleanValue = CarbonUtil.validateBoolean(unSafeSortStr);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 96fe909..aed3449 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -47,15 +47,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
 
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  private long length;
+
   public CarbonMultiBlockSplit() {
     splitList = null;
     locations = null;
+    length = 0;
   }
 
   public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
       String[] locations) throws IOException {
     this.splitList = splitList;
     this.locations = locations;
+    calculateLength();
   }
 
   public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
@@ -63,6 +67,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     this.splitList = splitList;
     this.locations = locations;
     this.fileFormat = fileFormat;
+    calculateLength();
   }
 
   /**
@@ -75,11 +80,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
 
   @Override
   public long getLength() throws IOException, InterruptedException {
+    return length;
+  }
+
+  public void setLength(long length) {
+    this.length = length;
+  }
+
+  private void calculateLength() {
     long total = 0;
-    for (InputSplit split: splitList) {
+    for (CarbonInputSplit split : splitList) {
       total += split.getLength();
     }
-    return total;
+    length = total;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 4958f55..9014edb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -439,7 +439,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
     }
     sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
 
-    assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
     checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"),
       sql("SELECT * FROM carbon_localsort order by name, id"))
@@ -454,7 +454,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
     }
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
-    assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
     checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"),
       sql("SELECT * FROM carbon_localsort order by name, id"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index f9959fa..02c602a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
-import java.io.{File, FilenameFilter}
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.Row
@@ -26,10 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index ed63fdf..61de615 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -59,16 +59,16 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
     )
     // compaction will happen here.
-    sql("alter table ignoremajor compact 'major'"
-    )
-      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+    sql("alter table ignoremajor compact 'major'")
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
         "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
-      )
-      sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
         "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
-      )
-      sql("alter table ignoremajor compact 'minor'"
-      )
+    )
+    sql("alter table ignoremajor compact 'minor'"
+    )
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 6bbc763..9ce9675 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -17,6 +17,10 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import java.io.{File, FileWriter}
+
+import org.apache.commons.io.FileUtils
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -105,7 +109,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
       "OPTIONS('BAD_RECORDS_ACTION'='REDIRECT')")
 
-    assert(getIndexFileCount("carbon_globalsort") === 3)
+    assert(getIndexFileCount("carbon_globalsort") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
   }
 
@@ -115,7 +119,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
       "OPTIONS('SINGLE_PASS'='TRUE')")
 
-    assert(getIndexFileCount("carbon_globalsort") === 3)
+    assert(getIndexFileCount("carbon_globalsort") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
     checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
       sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
@@ -164,7 +168,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
     sql("ALTER TABLE carbon_globalsort COMPACT 'MAJOR'")
 
-    assert(getIndexFileCount("carbon_globalsort") === 3)
+    assert(getIndexFileCount("carbon_globalsort") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(24)))
     checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"),
       sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id"))
@@ -223,7 +227,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
     sql("DELETE FROM carbon_globalsort WHERE id = 1").show
 
-    assert(getIndexFileCount("carbon_globalsort") === 3)
+    assert(getIndexFileCount("carbon_globalsort") === 2)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
     checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"),
       sql("SELECT * FROM carbon_localsort_delete ORDER BY name, id"))
@@ -250,6 +254,27 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
       sql("SELECT * FROM carbon_localsort_update ORDER BY name, id"))
   }
 
+  test("LOAD with small files") {
+    val inputPath = new File("target/small_files").getCanonicalPath
+    val folder = new File(inputPath)
+    if (folder.exists()) {
+      FileUtils.deleteDirectory(folder)
+    }
+    folder.mkdir()
+    for (i <- 0 to 100) {
+      val file = s"$folder/file$i.csv"
+      val writer = new FileWriter(file)
+      writer.write("id,name,city,age\n")
+      writer.write(s"$i,name_$i,city_$i,${ i % 100 }")
+      writer.close()
+    }
+    sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+    assertResult(5)(new File(segmentDir).listFiles().length)
+  }
+
   // ----------------------------------- INSERT INTO -----------------------------------
   test("INSERT INTO") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "GLOBAL_SORT")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index c137fc7..f73a202 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -70,6 +70,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
     sqlContext.sparkContext.parallelize(1 to numRows)
       .map(x => ("a", "b", x))
       .toDF("c1", "c2", "c3")
+      .sort("c3")
   }
 
   def dropTable(): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index c14e0a7..2537a0c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -17,17 +17,26 @@
 
 package org.apache.carbondata.spark.load
 
-import java.util.Comparator
+import java.text.SimpleDateFormat
+import java.util.{Comparator, Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.TaskContext
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.NewHadoopRDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -36,10 +45,11 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -49,7 +59,7 @@ object DataLoadProcessBuilderOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def loadDataUsingGlobalSort(
-      sc: SparkContext,
+      sparkSession: SparkSession,
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
       hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
@@ -57,21 +67,13 @@ object DataLoadProcessBuilderOnSpark {
       dataFrame.get.rdd
     } else {
       // input data from files
-      CommonUtil.configureCSVInputFormat(hadoopConf, model)
-      hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
       val columnCount = model.getCsvHeaderColumns.length
-      val jobConf = new JobConf(hadoopConf)
-      SparkHadoopUtil.get.addCredentials(jobConf)
-      new NewHadoopRDD[NullWritable, StringArrayWritable](
-        sc,
-        classOf[CSVInputFormat],
-        classOf[NullWritable],
-        classOf[StringArrayWritable],
-        jobConf)
-        .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
+      csvFileScanRDD(sparkSession, model, hadoopConf)
+        .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 
     model.setPartitionId("0")
+    val sc = sparkSession.sparkContext
     val modelBroadcast = sc.broadcast(model)
     val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
 
@@ -160,4 +162,112 @@ object DataLoadProcessBuilderOnSpark {
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     }
   }
+
+  /**
+   * creates a RDD that does reading of multiple CSV files
+   */
+  def csvFileScanRDD(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    // 1. partition
+    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+    val defaultParallelism = spark.sparkContext.defaultParallelism
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val jobContext = new JobContextImpl(jobConf, null)
+    val inputFormat = new CSVInputFormat()
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val splitFiles = rawSplits.map { split =>
+      val fileSplit = split.asInstanceOf[FileSplit]
+      PartitionedFile(
+        InternalRow.empty,
+        fileSplit.getPath.toString,
+        fileSplit.getStart,
+        fileSplit.getLength,
+        fileSplit.getLocations)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+                s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+
+    // 2. read function
+    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        new Iterator[InternalRow] {
+          val hadoopConf = serializableConfiguration.value
+          val jobTrackerId: String = {
+            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+            formatter.format(new Date())
+          }
+          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val inputSplit =
+            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+          var finished = false
+          val inputFormat = new CSVInputFormat()
+          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+          reader.initialize(inputSplit, hadoopAttemptContext)
+
+          override def hasNext: Boolean = {
+            if (!finished) {
+              if (reader != null) {
+                if (reader.nextKeyValue()) {
+                  true
+                } else {
+                  finished = true
+                  reader.close()
+                  false
+                }
+              } else {
+                finished = true
+                false
+              }
+            } else {
+              false
+            }
+          }
+
+          override def next(): InternalRow = {
+            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+          }
+        }
+      }
+    }
+    new FileScanRDD(spark, readFunction, partitions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 5e6ba98..154d3ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -23,6 +23,8 @@ import com.univocity.parsers.common.TextParsingException
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
@@ -30,7 +32,6 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
@@ -45,9 +46,9 @@ import org.apache.carbondata.spark.util.Util
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = {
+  def toStringArrayRow(row: InternalRow, columnCount: Int): StringArrayRow = {
     val outRow = new StringArrayRow(new Array[String](columnCount))
-    outRow.setValues(row.get())
+    outRow.setValues(row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]])
   }
 
   def toRDDIterator(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e58bfd4..09dbd71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,6 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -55,7 +57,7 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
  * level filtering in driver side.
  */
 class CarbonScanRDD(
-    @transient sc: SparkContext,
+    @transient spark: SparkSession,
     val columnProjection: CarbonProjection,
     var filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
@@ -63,7 +65,7 @@ class CarbonScanRDD(
     @transient tableInfo: TableInfo,
     inputMetricsStats: InitInputMetrics,
     @transient val partitionNames: Seq[String])
-  extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
+  extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
@@ -186,31 +188,74 @@ class CarbonScanRDD(
           }
         }
         noOfNodes = nodeBlockMapping.size
-      } else {
-        if (CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
-          // Use blocklet distribution
-          // Randomize the blocklets for better shuffling
-          Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
-            val multiBlockSplit =
-              new CarbonMultiBlockSplit(identifier,
-                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
-                splitWithIndex._1.getLocations)
-            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
-            result.add(partition)
+      } else if (CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
+        CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
+        // Use blocklet distribution
+        // Randomize the blocklets for better shuffling
+        Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(identifier,
+              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+              splitWithIndex._1.getLocations)
+          val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+          result.add(partition)
+        }
+      } else if (CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES,
+        CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT).toBoolean) {
+
+        // sort blocks in reverse order of length
+        val blockSplits = splits
+          .asScala
+          .map(_.asInstanceOf[CarbonInputSplit])
+          .groupBy(f => f.getBlockPath)
+          .map { blockSplitEntry =>
+            new CarbonMultiBlockSplit(identifier,
+              blockSplitEntry._2.asJava,
+              blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
+          }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
+
+        val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+        val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+        val defaultParallelism = spark.sparkContext.defaultParallelism
+        val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
+        val bytesPerCore = totalBytes / defaultParallelism
+
+        val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+        LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+                    s"open cost is considered as scanning $openCostInBytes bytes.")
+
+        val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
+        var currentSize = 0L
+
+        def closePartition(): Unit = {
+          if (currentFiles.nonEmpty) {
+            result.add(combineSplits(currentFiles, currentSize, result.size()))
           }
-        } else {
-          // Use block distribution
-          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).
-            groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex =>
-            val multiBlockSplit =
-              new CarbonMultiBlockSplit(identifier,
-                splitWithIndex._1.asJava,
-                splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
-            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
-            result.add(partition)
+          currentFiles.clear()
+          currentSize = 0
+        }
+
+        blockSplits.foreach { file =>
+          if (currentSize + file.getLength > maxSplitBytes) {
+            closePartition()
           }
+          // Add the given file to the current partition.
+          currentSize += file.getLength + openCostInBytes
+          currentFiles += file
+        }
+        closePartition()
+      } else {
+        // Use block distribution
+        splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          .groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex =>
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(identifier,
+              splitWithIndex._1.asJava,
+              splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
+          val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+          result.add(partition)
         }
       }
 
@@ -232,6 +277,32 @@ class CarbonScanRDD(
     result.asScala
   }
 
+  def combineSplits(
+      splits: ArrayBuffer[CarbonMultiBlockSplit],
+      size: Long,
+      partitionId: Int
+  ): CarbonSparkPartition = {
+    val carbonInputSplits = splits.flatMap(_.getAllSplits.asScala)
+
+    // Computes total number of bytes can be retrieved from each host.
+    val hostToNumBytes = mutable.HashMap.empty[String, Long]
+    splits.foreach { split =>
+      split.getLocations.filter(_ != "localhost").foreach { host =>
+        hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + split.getLength
+      }
+    }
+    // Takes the first 3 hosts with the most data to be retrieved
+    val locations = hostToNumBytes
+      .toSeq
+      .sortBy(_._2)(implicitly[Ordering[Long]].reverse)
+      .take(3)
+      .map(_._1)
+      .toArray
+
+    val multiBlockSplit = new CarbonMultiBlockSplit(null, carbonInputSplits.asJava, locations)
+    new CarbonSparkPartition(id, partitionId, multiBlockSplit)
+  }
+
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val queryStartTime = System.currentTimeMillis
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
index 9fd7099..7dabddb 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.util
 
-  import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.AnalysisException
 
-  object CarbonException {
-    def analysisException(message: String): Nothing = throw new AnalysisException(message)
-  }
+object CarbonException {
+  def analysisException(message: String): Nothing = throw new AnalysisException(message)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
new file mode 100644
index 0000000..370f80c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SessionState
+
+object SparkSQLUtil {
+  def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0b786b5..72c979a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -342,7 +342,7 @@ object CarbonDataRDDFactory {
         status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
           loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
         } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
+          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
             dataFrame, carbonLoadModel, hadoopConf)
         } else if (dataFrame.isDefined) {
           loadDataFrame(sqlContext, dataFrame, carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 148fca8..ca0c51d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -76,7 +76,7 @@ case class CarbonDatasourceHadoopRelation(
     requiredColumns.foreach(projection.addColumn)
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(
-      sparkSession.sparkContext,
+      sparkSession,
       projection,
       filterExpression.orNull,
       identifier,