You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/10/29 08:23:03 UTC

carbondata git commit: [CARBONDATA-3041]Optimize load minimum size strategy for data loading

Repository: carbondata
Updated Branches:
  refs/heads/master db5da530e -> e2c517e3f


[CARBONDATA-3041]Optimize load minimum size strategy for data loading

this PR modifies the following points:
  1. Delete system property carbon.load.min.size.enabled,modified this property load_min_size_inmb to table property,and This property can also be specified in the load option.
  2. Support to alter table xxx set TBLPROPERTIES('load_min_size_inmb '='256')
  3. If creating a table has this property load_min_size_inmb,Display this property via the desc formatted command.

This closes #2864


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e2c517e3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e2c517e3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e2c517e3

Branch: refs/heads/master
Commit: e2c517e3f9225b3b3ff9e55bcfee0d73fe943f01
Parents: db5da53
Author: ndwangsen <lu...@huawei.com>
Authored: Sat Oct 27 10:38:48 2018 +0800
Committer: xuchuanyin <xu...@hust.edu.cn>
Committed: Mon Oct 29 16:20:56 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   3 +-
 .../constants/CarbonLoadOptionConstants.java    |  10 -
 .../carbondata/core/util/CarbonProperties.java  |  11 -
 docs/configuration-parameters.md                |   1 -
 docs/ddl-of-carbondata.md                       |  19 +-
 .../dataload/TestTableLoadMinSize.scala         |  62 +-
 .../carbondata/spark/util/CommonUtil.scala      |  28 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |  20 +-
 .../table/CarbonDescribeFormattedCommand.scala  |   6 +
 .../org/apache/spark/util/AlterTableUtil.scala  |  20 +-
 .../loading/model/CarbonLoadModelBuilder.java   | 829 ++++++++++---------
 .../processing/loading/model/LoadOption.java    |   4 +-
 .../processing/util/CarbonLoaderUtil.java       |   4 +-
 14 files changed, 555 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 37e2aa1..bf4f7e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -980,8 +980,7 @@ public final class CarbonCommonConstants {
    */
   @CarbonProperty
   public static final String CARBON_LOAD_MIN_SIZE_INMB = "load_min_size_inmb";
-  public static final String CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT = "256";
-
+  public static final String CARBON_LOAD_MIN_SIZE_INMB_DEFAULT = "0";
   /**
    *  the node minimum load data default value
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 82485ca..0d98cf4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -151,14 +151,4 @@ public final class CarbonLoadOptionConstants {
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE
       = "carbon.load.sortmemory.spill.percentage";
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT = "0";
-
-  /**
-   *  if loading data is too small, the original loading method will produce many small files.
-   *  enable set the node load minimum amount of data,avoid producing many small files.
-   *  This option is especially useful when you encounter a lot of small amounts of data.
-   */
-  @CarbonProperty
-  public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE
-      = "carbon.load.min.size.enabled";
-  public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 49d89e7..7ec22be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1249,17 +1249,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * whether optimization for the node loads the minimum amount of data is enabled
-   * @return true, if enabled; false for not enabled.
-   */
-  public boolean isLoadMinSizeOptimizationEnabled() {
-    String loadMinSize = getProperty(
-            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE,
-            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT);
-    return loadMinSize.equalsIgnoreCase("true");
-  }
-
-  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index de98c8d..2a3748c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -86,7 +86,6 @@ This section provides the details of all the configurations required for the Car
 | carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
 | carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. |
 | carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would divide the number of blocks equally so as to ensure all executors process same number of blocks. This mechanism satisfies most of the scenarios and ensures maximum parallel processing for optimal data loading performance.In some business scenarios, there might be scenarios where the size of blocks vary significantly and hence some executors would have to do more work if they get blocks containing more data. This configuration enables size based block allocation strategy for data loading. When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data.**NOTE:** This configuration is useful if the size of your input data files varies widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern and size is important and necessary. |
-| carbon.load.min.size.enabled | false | During Data Loading, CarbonData would divide the number of files among the available executors to parallelize the loading operation. When the input data files are very small, this action causes to generate many small carbondata files. This configuration determines whether to enable node minumun input data size allocation strategy for data loading.It will make sure that the node load the minimum amount of data there by reducing number of carbondata files.**NOTE:** This configuration is useful if the size of the input data files are very small, like 1MB to 256MB. Refer to the load option ***load_min_size_inmb*** to configure the minimum size to be considered for splitting files among executors. |
 | enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time.It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
 | carbon.dictionary.chunk.size | 10000 | CarbonData generates dictionary keys and writes them to separate dictionary file during data loading. To optimize the IO, this configuration determines the number of dictionary keys to be persisted to dictionary file at a time. **NOTE:** Writing to file also serves as a commit point to the dictionary generated.Increasing more values in memory causes more data loss during system or application failure.It is advised to alter this configuration judiciously. |
 | dictionary.worker.threads | 1 | CarbonData supports Optimized data loading by relying on a dictionary server. Dictionary server helps to maintain dictionary values independent of the data loading and there by avoids reading the same input data multiples times. This configuration determines the number of concurrent dictionary generation or request that needs to be served by the dictionary server. **NOTE:** This configuration takes effect when ***carbon.options.single.pass*** is configured as true.Please refer to *carbon.options.single.pass*to understand how dictionary server optimizes data loading. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/docs/ddl-of-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 933a448..96335c6 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -33,7 +33,9 @@ CarbonData DDL statements are documented here,which includes:
   * [Hive/Parquet folder Structure](#support-flat-folder-same-as-hiveparquet)
   * [Extra Long String columns](#string-longer-than-32000-characters)
   * [Compression for Table](#compression-for-table)
-  * [Bad Records Path](#bad-records-path)
+  * [Bad Records Path](#bad-records-path) 
+  * [Load Minimum Input File Size](#load-minimum-data-size) 
+
 * [CREATE TABLE AS SELECT](#create-table-as-select)
 * [CREATE EXTERNAL TABLE](#create-external-table)
   * [External Table on Transactional table location](#create-external-table-on-managed-table-data-location)
@@ -104,6 +106,7 @@ CarbonData DDL statements are documented here,which includes:
 | [LONG_STRING_COLUMNS](#string-longer-than-32000-characters)  | Columns which are greater than 32K characters                |
 | [BUCKETNUMBER](#bucketing)                                   | Number of buckets to be created                              |
 | [BUCKETCOLUMNS](#bucketing)                                  | Columns which are to be placed in buckets                    |
+| [LOAD_MIN_SIZE_INMB](#load-minimum-data-size)                | Minimum input data size per node for data loading          |
 
  Following are the guidelines for TBLPROPERTIES, CarbonData's additional table options can be set via carbon.properties.
 
@@ -474,7 +477,19 @@ CarbonData DDL statements are documented here,which includes:
      be later viewed in table description for reference.
 
      ```
-       TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'')
+       TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords')
+     ```
+     
+   - ##### Load minimum data size
+     This property indicates the minimum input data size per node for data loading.
+     By default it is not enabled. Setting a non-zero integer value will enable this feature.
+     This property is useful if you have a large cluster and only want a small portion of the nodes to process data loading.
+     For example, if you have a cluster with 10 nodes and the input data is about 1GB. Without this property, each node will process about 100MB input data and result in at least 10 data files. With this property configured with 512, only 2 nodes will be chosen to process the input data, each with about 512MB input and result in about 2 or 4 files based on the compress ratio.
+     Moreover, this property can also be specified in the load option.
+     Notice that once you enable this feature, for load balance, carbondata will ignore the data locality while assigning input data to nodes, this will cause more network traffic.
+
+     ```
+       TBLPROPERTIES('LOAD_MIN_SIZE_INMB'='256')
      ```
 
 ## CREATE TABLE AS SELECT

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
index ebb4e32..c18ce91 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -37,6 +36,9 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS table_loadminsize1")
     sql("DROP TABLE IF EXISTS table_loadminsize2")
     sql("DROP TABLE IF EXISTS table_loadminsize3")
+    sql("DROP TABLE IF EXISTS table_loadminsize4")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
   }
 
   test("Value test: set table load min size in not int value") {
@@ -49,12 +51,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
         TBLPROPERTIES('table_blocksize'='128 MB')
       """)
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")
-
     sql(s"""
            LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize1 OPTIONS('load_min_size_inmb'='256 MB')
            """)
@@ -81,12 +77,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
         TBLPROPERTIES('table_blocksize'='128 MB')
       """)
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")
-
     sql(s"""
            LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize2 OPTIONS('load_min_size_inmb'='256')
            """)
@@ -114,12 +104,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
         TBLPROPERTIES('table_blocksize'='128 MB')
       """)
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")
-
     sql(s"""
            LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize3
            """)
@@ -136,14 +120,50 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("Function test:: set load_min_size_inmb to table property") {
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS table_loadminsize4
+        (ID Int, date Timestamp, country String,
+        name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+        TBLPROPERTIES('table_blocksize'='128 MB', 'load_min_size_inmb'='256')
+      """)
+
+    sql(
+      """
+        desc formatted table_loadminsize4
+      """).show(false)
+
+    sql(
+      """
+        alter table table_loadminsize4 set TBLPROPERTIES('load_min_size_inmb'='512')
+      """).show(false)
+
+    sql(s"""
+           LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize4
+           """)
+
+    checkAnswer(
+      sql("""
+           SELECT country, count(salary) AS amount
+           FROM table_loadminsize4
+           WHERE country IN ('china','france')
+           GROUP BY country
+          """),
+      Seq(Row("china", 96), Row("france", 1))
+    )
+
+  }
+
 
   override def afterAll {
     sql("DROP TABLE IF EXISTS table_loadminsize1")
     sql("DROP TABLE IF EXISTS table_loadminsize2")
     sql("DROP TABLE IF EXISTS table_loadminsize3")
+    sql("DROP TABLE IF EXISTS table_loadminsize4")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7071295..21b166f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -833,4 +833,32 @@ object CommonUtil {
       })
     }
   }
+
+  /**
+   * This method will validate single node minimum load data volume of table specified by the user
+   *
+   * @param tableProperties table property specified by user
+   * @param propertyName property name
+   */
+  def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = {
+    var size: Integer = 0
+    if (tableProperties.get(propertyName).isDefined) {
+      val loadSizeStr: String =
+        parsePropertyValueStringInMB(tableProperties(propertyName))
+      try {
+        size = Integer.parseInt(loadSizeStr)
+      } catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " +
+                                                    s"$loadSizeStr, only int value greater " +
+                                                    s"than 0 is supported.")
+      }
+      // if the value is negative, set the value is 0
+      if(size > 0) {
+        tableProperties.put(propertyName, loadSizeStr)
+      } else {
+        tableProperties.put(propertyName, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 36be655..107a303 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -429,6 +429,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     CommonUtil.validateTableLevelCompactionProperties(tableProperties)
     // validate flat folder property.
     CommonUtil.validateFlatFolder(tableProperties)
+    // validate load_min_size_inmb property
+    CommonUtil.validateLoadMinSize(tableProperties,
+      CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
 
     TableModel(
       ifNotExistPresent,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4f42139..2d0bc58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1171,21 +1171,27 @@ object CarbonDataRDDFactory {
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
     val skewedDataOptimization = CarbonProperties.getInstance()
       .isLoadSkewedDataOptimizationEnabled()
-    val loadMinSizeOptimization = CarbonProperties.getInstance()
-      .isLoadMinSizeOptimizationEnabled()
     // get user ddl input the node loads the smallest amount of data
-    val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize()
-    val blockAssignStrategy = if (skewedDataOptimization) {
-      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
-    } else if (loadMinSizeOptimization) {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    var loadMinSize = carbonLoadModel.getLoadMinSize()
+    if (loadMinSize.equalsIgnoreCase(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) {
+      loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
+          CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)
+    }
+
+    val blockAssignStrategy = if (!loadMinSize.equalsIgnoreCase(
+      CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) {
       CarbonLoaderUtil.BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST
+    } else if (skewedDataOptimization) {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
     } else {
       CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
     }
     LOGGER.info(s"Allocating block to nodes using strategy: $blockAssignStrategy")
 
     val nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
-      activeNodes.toList.asJava, blockAssignStrategy, expectedMinSizePerNode).asScala.toSeq
+      activeNodes.toList.asJava, blockAssignStrategy, loadMinSize).asScala.toSeq
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 029c0e3..b513c1f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -123,6 +123,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
         tblProps.get(CarbonCommonConstants.LONG_STRING_COLUMNS), ""))
     }
 
+    // load min size info
+    if (tblProps.containsKey(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)) {
+      results ++= Seq(("Minimum input data size per node for data loading",
+        tblProps.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB), ""))
+    }
+
     var isLocalDictEnabled = tblProps.asScala
       .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
     if (isLocalDictEnabled.isDefined) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 20cffa7..27443a8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -326,6 +326,9 @@ object AlterTableUtil {
       // validate the local dictionary properties
       validateLocalDictionaryProperties(lowerCasePropertiesMap, tblPropertiesMap, carbonTable)
 
+      // validate the load min size properties
+      validateLoadMinSizeProperties(carbonTable, lowerCasePropertiesMap)
+
       // below map will be used for cache invalidation. As tblProperties map is getting modified
       // in the next few steps the original map need to be retained for any decision making
       val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
@@ -399,7 +402,8 @@ object AlterTableUtil {
       "LOCAL_DICTIONARY_ENABLE",
       "LOCAL_DICTIONARY_THRESHOLD",
       "LOCAL_DICTIONARY_INCLUDE",
-      "LOCAL_DICTIONARY_EXCLUDE")
+      "LOCAL_DICTIONARY_EXCLUDE",
+      "LOAD_MIN_SIZE_INMB")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
@@ -748,4 +752,18 @@ object AlterTableUtil {
       false
     }
   }
+
+  private def validateLoadMinSizeProperties(carbonTable: CarbonTable,
+      propertiesMap: mutable.Map[String, String]): Unit = {
+    // validate load min size property
+    if (propertiesMap.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB).isDefined) {
+      // load min size is not allowed for child tables and dataMaps
+      if (carbonTable.isChildDataMap) {
+        throw new MalformedCarbonCommandException(s"Table property ${
+          CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB} is not allowed for child datamaps")
+      }
+      CommonUtil.validateLoadMinSize(propertiesMap,
+        CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 7fecb12..03b2645 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -1,406 +1,423 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.model;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.Maps;
-import org.apache.carbondata.common.Strings;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.constants.LoggerAction;
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
-import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
-import org.apache.carbondata.processing.util.TableOptionConstant;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-
-/**
- * Builder for {@link CarbonLoadModel}
- */
-@InterfaceAudience.Internal
-public class CarbonLoadModelBuilder {
-  private static final Logger LOGGER = LogServiceFactory.getLogService(
-      CarbonLoadModelBuilder.class.getName());
-  private CarbonTable table;
-
-  public CarbonLoadModelBuilder(CarbonTable table) {
-    this.table = table;
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param options Load options from user input
-   * @param taskNo
-   * @return a new CarbonLoadModel instance
-   */
-  public CarbonLoadModel build(Map<String, String>  options, long timestamp, String taskNo)
-      throws InvalidLoadOptionException, IOException {
-    Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
-
-    if (!options.containsKey("fileheader")) {
-      List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
-      String[] columns = new String[csvHeader.size()];
-      for (int i = 0; i < columns.length; i++) {
-        columns[i] = csvHeader.get(i).getColName();
-      }
-      optionsFinal.put("fileheader", Strings.mkString(columns, ","));
-    }
-    optionsFinal.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options, table));
-    optionsFinal.put("sort_scope",
-        Maps.getOrDefault(options, "sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
-    CarbonLoadModel model = new CarbonLoadModel();
-    model.setCarbonTransactionalTable(table.isTransactionalTable());
-    model.setFactTimeStamp(timestamp);
-    model.setTaskNo(taskNo);
-
-    // we have provided 'fileheader', so it hadoopConf can be null
-    build(options, optionsFinal, model, null);
-    String timestampFormat = options.get("timestampformat");
-    if (timestampFormat == null) {
-      timestampFormat = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
-    }
-    String dateFormat = options.get("dateFormat");
-    if (dateFormat == null) {
-      dateFormat = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
-    }
-    model.setDateFormat(dateFormat);
-    model.setTimestampformat(timestampFormat);
-    model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
-    model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
-    try {
-      model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
-    } catch (NumberFormatException e) {
-      throw new InvalidLoadOptionException(e.getMessage());
-    }
-    validateAndSetColumnCompressor(model);
-    return model;
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param options Load options from user input
-   * @param optionsFinal Load options that populated with default values for optional options
-   * @param carbonLoadModel The output load model
-   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
-   *                   user provided load options
-   */
-  public void build(
-      Map<String, String> options,
-      Map<String, String> optionsFinal,
-      CarbonLoadModel carbonLoadModel,
-      Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
-    build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param options Load options from user input
-   * @param optionsFinal Load options that populated with default values for optional options
-   * @param carbonLoadModel The output load model
-   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
-   *                   user provided load options
-   * @param partitions partition name map to path
-   * @param isDataFrame true if build for load for dataframe
-   */
-  public void build(
-      Map<String, String> options,
-      Map<String, String> optionsFinal,
-      CarbonLoadModel carbonLoadModel,
-      Configuration hadoopConf,
-      Map<String, String> partitions,
-      boolean isDataFrame) throws InvalidLoadOptionException, IOException {
-    carbonLoadModel.setTableName(table.getTableName());
-    carbonLoadModel.setDatabaseName(table.getDatabaseName());
-    carbonLoadModel.setTablePath(table.getTablePath());
-    carbonLoadModel.setTableName(table.getTableName());
-    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable());
-    CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
-    String sort_scope = optionsFinal.get("sort_scope");
-    String single_pass = optionsFinal.get("single_pass");
-    String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
-    String bad_records_action = optionsFinal.get("bad_records_action");
-    String bad_record_path = optionsFinal.get("bad_record_path");
-    String global_sort_partitions = optionsFinal.get("global_sort_partitions");
-    String timestampformat = optionsFinal.get("timestampformat");
-    String dateFormat = optionsFinal.get("dateformat");
-    String delimeter = optionsFinal.get("delimiter");
-    String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
-    String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
-    String all_dictionary_path = optionsFinal.get("all_dictionary_path");
-    String column_dict = optionsFinal.get("columndict");
-    validateDateTimeFormat(timestampformat, "TimestampFormat");
-    validateDateTimeFormat(dateFormat, "DateFormat");
-    validateSortScope(sort_scope);
-
-    if (Boolean.parseBoolean(bad_records_logger_enable) ||
-        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-      if (!StringUtils.isEmpty(bad_record_path)) {
-        bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
-      } else {
-        throw new InvalidLoadOptionException(
-            "Cannot redirect bad records as bad record location is not provided.");
-      }
-    }
-
-    carbonLoadModel.setBadRecordsLocation(bad_record_path);
-
-    validateGlobalSortPartitions(global_sort_partitions);
-    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
-    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
-    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
-
-    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-    // we should use table schema to generate file header.
-    String fileHeader = optionsFinal.get("fileheader");
-    String headerOption = options.get("header");
-    if (headerOption != null) {
-      if (!headerOption.equalsIgnoreCase("true") &&
-          !headerOption.equalsIgnoreCase("false")) {
-        throw new InvalidLoadOptionException(
-            "'header' option should be either 'true' or 'false'.");
-      }
-      // whether the csv file has file header, the default value is true
-      if (Boolean.valueOf(headerOption)) {
-        if (!StringUtils.isEmpty(fileHeader)) {
-          throw new InvalidLoadOptionException(
-              "When 'header' option is true, 'fileheader' option is not required.");
-        }
-      } else {
-        if (StringUtils.isEmpty(fileHeader)) {
-          List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
-          String[] columnNames = new String[columns.size()];
-          for (int i = 0; i < columnNames.length; i++) {
-            columnNames[i] = columns.get(i).getColName();
-          }
-          fileHeader = Strings.mkString(columnNames, ",");
-        }
-      }
-    }
-
-    carbonLoadModel.setTimestampformat(timestampformat);
-    carbonLoadModel.setDateFormat(dateFormat);
-    carbonLoadModel.setDefaultTimestampFormat(
-        CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
-
-    carbonLoadModel.setDefaultDateFormat(
-        CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.CARBON_DATE_FORMAT,
-            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
-
-    carbonLoadModel.setSerializationNullFormat(
-        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
-            optionsFinal.get("serialization_null_format"));
-
-    carbonLoadModel.setBadRecordsLoggerEnable(
-        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
-
-    carbonLoadModel.setBadRecordsAction(
-        TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
-
-    carbonLoadModel.setIsEmptyDataBadRecord(
-        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-            optionsFinal.get("is_empty_data_bad_record"));
-
-    carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
-
-    carbonLoadModel.setSortScope(sort_scope);
-    carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
-    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
-    carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
-
-    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-      throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
-    } else {
-      carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1);
-      carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2);
-    }
-    // set local dictionary path, and dictionary file extension
-    carbonLoadModel.setAllDictPath(all_dictionary_path);
-    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
-    carbonLoadModel.setCsvHeader(fileHeader);
-    carbonLoadModel.setColDictFilePath(column_dict);
-
-    List<String> ignoreColumns = new ArrayList<>();
-    if (!isDataFrame) {
-      for (Map.Entry<String, String> partition : partitions.entrySet()) {
-        if (partition.getValue() != null) {
-          ignoreColumns.add(partition.getKey());
-        }
-      }
-    }
-
-    carbonLoadModel.setCsvHeaderColumns(
-        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
-
-    int validatedMaxColumns = validateMaxColumns(
-        carbonLoadModel.getCsvHeaderColumns(),
-        optionsFinal.get("maxcolumns"));
-
-    carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
-    if (carbonLoadModel.isCarbonTransactionalTable()) {
-      carbonLoadModel.readAndSetLoadMetadataDetails();
-    }
-    carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
-    carbonLoadModel.setLoadMinSize(
-        optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB));
-
-    validateAndSetColumnCompressor(carbonLoadModel);
-  }
-
-  private int validateMaxColumns(String[] csvHeaders, String maxColumns)
-      throws InvalidLoadOptionException {
-    /*
-    User configures both csvheadercolumns, maxcolumns,
-      if csvheadercolumns >= maxcolumns, give error
-      if maxcolumns > threashold, give error
-    User configures csvheadercolumns
-      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
-      if csvheadercolumns >= threashold, give error
-    User configures nothing
-      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
-      if csvheadercolumns >= threashold, give error
-     */
-    int columnCountInSchema = csvHeaders.length;
-    int maxNumberOfColumnsForParsing = 0;
-    Integer maxColumnsInt = getMaxColumnValue(maxColumns);
-    if (maxColumnsInt != null) {
-      if (columnCountInSchema >= maxColumnsInt) {
-        throw new InvalidLoadOptionException(
-            "csv headers should be less than the max columns " + maxColumnsInt);
-      } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-        throw new InvalidLoadOptionException(
-            "max columns cannot be greater than the threshold value: " +
-                CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
-      } else {
-        maxNumberOfColumnsForParsing = maxColumnsInt;
-      }
-    } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-      throw new InvalidLoadOptionException(
-          "csv header columns should be less than max threashold: " +
-              CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
-    } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-      maxNumberOfColumnsForParsing = columnCountInSchema + 1;
-    } else {
-      maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
-    }
-    return maxNumberOfColumnsForParsing;
-  }
-
-  private Integer getMaxColumnValue(String maxColumn) {
-    return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
-  }
-
-  /**
-   * validates both timestamp and date for illegal values
-   */
-  private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
-      throws InvalidLoadOptionException {
-    // allowing empty value to be configured for dateformat option.
-    if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
-      try {
-        new SimpleDateFormat(dateTimeLoadFormat);
-      } catch (IllegalArgumentException e) {
-        throw new InvalidLoadOptionException(
-            "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
-                + dateTimeLoadOption);
-      }
-    }
-  }
-
-  private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
-    if (sortScope != null) {
-      // We support global sort for Hive standard partition, but don't support
-      // global sort for other partition type.
-      if (table.getPartitionInfo(table.getTableName()) != null &&
-          !table.isHivePartitionTable() &&
-          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
-        throw new InvalidLoadOptionException("Don't support use global sort on "
-            + table.getPartitionInfo().getPartitionType() +  " partition table.");
-      }
-    }
-  }
-
-  private void validateGlobalSortPartitions(String globalSortPartitions)
-      throws InvalidLoadOptionException {
-    if (globalSortPartitions != null) {
-      try {
-        int num = Integer.parseInt(globalSortPartitions);
-        if (num <= 0) {
-          throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
-        }
-      } catch (NumberFormatException e) {
-        throw new InvalidLoadOptionException(e.getMessage());
-      }
-    }
-  }
-
-  private void validateAndSetColumnCompressor(CarbonLoadModel carbonLoadModel)
-      throws InvalidLoadOptionException {
-    try {
-      String columnCompressor = carbonLoadModel.getColumnCompressor();
-      if (StringUtils.isBlank(columnCompressor)) {
-        columnCompressor = CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
-      }
-      // check and load compressor
-      CompressorFactory.getInstance().getCompressor(columnCompressor);
-      carbonLoadModel.setColumnCompressor(columnCompressor);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new InvalidLoadOptionException("Failed to load the compressor");
-    }
-  }
-
-  /**
-   * check whether using default value or not
-   */
-  private String checkDefaultValue(String value, String defaultValue) {
-    if (StringUtils.isEmpty(value)) {
-      return defaultValue;
-    } else {
-      return value;
-    }
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Internal
+public class CarbonLoadModelBuilder {
+  private static final Logger LOGGER = LogServiceFactory.getLogService(
+      CarbonLoadModelBuilder.class.getName());
+  private CarbonTable table;
+
+  public CarbonLoadModelBuilder(CarbonTable table) {
+    this.table = table;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param taskNo
+   * @return a new CarbonLoadModel instance
+   */
+  public CarbonLoadModel build(Map<String, String>  options, long timestamp, String taskNo)
+      throws InvalidLoadOptionException, IOException {
+    Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+
+    if (!options.containsKey("fileheader")) {
+      List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
+      String[] columns = new String[csvHeader.size()];
+      for (int i = 0; i < columns.length; i++) {
+        columns[i] = csvHeader.get(i).getColName();
+      }
+      optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+    }
+    optionsFinal.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options, table));
+    optionsFinal.put("sort_scope",
+        Maps.getOrDefault(options, "sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
+    CarbonLoadModel model = new CarbonLoadModel();
+    model.setCarbonTransactionalTable(table.isTransactionalTable());
+    model.setFactTimeStamp(timestamp);
+    model.setTaskNo(taskNo);
+
+    // we have provided 'fileheader', so it hadoopConf can be null
+    build(options, optionsFinal, model, null);
+    String timestampFormat = options.get("timestampformat");
+    if (timestampFormat == null) {
+      timestampFormat = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+    }
+    String dateFormat = options.get("dateFormat");
+    if (dateFormat == null) {
+      dateFormat = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+    }
+    model.setDateFormat(dateFormat);
+    model.setTimestampformat(timestampFormat);
+    model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
+    model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
+    try {
+      model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
+    } catch (NumberFormatException e) {
+      throw new InvalidLoadOptionException(e.getMessage());
+    }
+    validateAndSetColumnCompressor(model);
+    return model;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+    build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   * @param partitions partition name map to path
+   * @param isDataFrame true if build for load for dataframe
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      Map<String, String> partitions,
+      boolean isDataFrame) throws InvalidLoadOptionException, IOException {
+    carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setDatabaseName(table.getDatabaseName());
+    carbonLoadModel.setTablePath(table.getTablePath());
+    carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable());
+    CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
+    String sort_scope = optionsFinal.get("sort_scope");
+    String single_pass = optionsFinal.get("single_pass");
+    String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
+    String bad_records_action = optionsFinal.get("bad_records_action");
+    String bad_record_path = optionsFinal.get("bad_record_path");
+    String global_sort_partitions = optionsFinal.get("global_sort_partitions");
+    String timestampformat = optionsFinal.get("timestampformat");
+    String dateFormat = optionsFinal.get("dateformat");
+    String delimeter = optionsFinal.get("delimiter");
+    String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
+    String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+    String all_dictionary_path = optionsFinal.get("all_dictionary_path");
+    String column_dict = optionsFinal.get("columndict");
+    validateDateTimeFormat(timestampformat, "TimestampFormat");
+    validateDateTimeFormat(dateFormat, "DateFormat");
+    validateSortScope(sort_scope);
+
+    if (Boolean.parseBoolean(bad_records_logger_enable) ||
+        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+      if (!StringUtils.isEmpty(bad_record_path)) {
+        bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
+      } else {
+        throw new InvalidLoadOptionException(
+            "Cannot redirect bad records as bad record location is not provided.");
+      }
+    }
+
+    carbonLoadModel.setBadRecordsLocation(bad_record_path);
+
+    validateGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
+    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
+    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+
+    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+    // we should use table schema to generate file header.
+    String fileHeader = optionsFinal.get("fileheader");
+    String headerOption = options.get("header");
+    if (headerOption != null) {
+      if (!headerOption.equalsIgnoreCase("true") &&
+          !headerOption.equalsIgnoreCase("false")) {
+        throw new InvalidLoadOptionException(
+            "'header' option should be either 'true' or 'false'.");
+      }
+      // whether the csv file has file header, the default value is true
+      if (Boolean.valueOf(headerOption)) {
+        if (!StringUtils.isEmpty(fileHeader)) {
+          throw new InvalidLoadOptionException(
+              "When 'header' option is true, 'fileheader' option is not required.");
+        }
+      } else {
+        if (StringUtils.isEmpty(fileHeader)) {
+          List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+          String[] columnNames = new String[columns.size()];
+          for (int i = 0; i < columnNames.length; i++) {
+            columnNames[i] = columns.get(i).getColName();
+          }
+          fileHeader = Strings.mkString(columnNames, ",");
+        }
+      }
+    }
+
+    carbonLoadModel.setTimestampformat(timestampformat);
+    carbonLoadModel.setDateFormat(dateFormat);
+    carbonLoadModel.setDefaultTimestampFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+    carbonLoadModel.setDefaultDateFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
+    carbonLoadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
+            optionsFinal.get("serialization_null_format"));
+
+    carbonLoadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
+
+    carbonLoadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
+
+    carbonLoadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+            optionsFinal.get("is_empty_data_bad_record"));
+
+    carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
+
+    carbonLoadModel.setSortScope(sort_scope);
+    carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
+    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
+
+    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+      throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
+    } else {
+      carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1);
+      carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2);
+    }
+    // set local dictionary path, and dictionary file extension
+    carbonLoadModel.setAllDictPath(all_dictionary_path);
+    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
+    carbonLoadModel.setCsvHeader(fileHeader);
+    carbonLoadModel.setColDictFilePath(column_dict);
+
+    List<String> ignoreColumns = new ArrayList<>();
+    if (!isDataFrame) {
+      for (Map.Entry<String, String> partition : partitions.entrySet()) {
+        if (partition.getValue() != null) {
+          ignoreColumns.add(partition.getKey());
+        }
+      }
+    }
+
+    carbonLoadModel.setCsvHeaderColumns(
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
+
+    int validatedMaxColumns = validateMaxColumns(
+        carbonLoadModel.getCsvHeaderColumns(),
+        optionsFinal.get("maxcolumns"));
+
+    carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
+    if (carbonLoadModel.isCarbonTransactionalTable()) {
+      carbonLoadModel.readAndSetLoadMetadataDetails();
+    }
+    carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
+    carbonLoadModel.setLoadMinSize(
+        optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB));
+    validateAndSetLoadMinSize(carbonLoadModel);
+
+    validateAndSetColumnCompressor(carbonLoadModel);
+  }
+
+  private int validateMaxColumns(String[] csvHeaders, String maxColumns)
+      throws InvalidLoadOptionException {
+    /*
+    User configures both csvheadercolumns, maxcolumns,
+      if csvheadercolumns >= maxcolumns, give error
+      if maxcolumns > threashold, give error
+    User configures csvheadercolumns
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+    User configures nothing
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+     */
+    int columnCountInSchema = csvHeaders.length;
+    int maxNumberOfColumnsForParsing = 0;
+    Integer maxColumnsInt = getMaxColumnValue(maxColumns);
+    if (maxColumnsInt != null) {
+      if (columnCountInSchema >= maxColumnsInt) {
+        throw new InvalidLoadOptionException(
+            "csv headers should be less than the max columns " + maxColumnsInt);
+      } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+        throw new InvalidLoadOptionException(
+            "max columns cannot be greater than the threshold value: " +
+                CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+      } else {
+        maxNumberOfColumnsForParsing = maxColumnsInt;
+      }
+    } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      throw new InvalidLoadOptionException(
+          "csv header columns should be less than max threashold: " +
+              CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+    } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      maxNumberOfColumnsForParsing = columnCountInSchema + 1;
+    } else {
+      maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
+    }
+    return maxNumberOfColumnsForParsing;
+  }
+
+  private Integer getMaxColumnValue(String maxColumn) {
+    return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
+  }
+
+  /**
+   * validates both timestamp and date for illegal values
+   */
+  private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
+      throws InvalidLoadOptionException {
+    // allowing empty value to be configured for dateformat option.
+    if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
+      try {
+        new SimpleDateFormat(dateTimeLoadFormat);
+      } catch (IllegalArgumentException e) {
+        throw new InvalidLoadOptionException(
+            "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
+                + dateTimeLoadOption);
+      }
+    }
+  }
+
+  private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
+    if (sortScope != null) {
+      // We support global sort for Hive standard partition, but don't support
+      // global sort for other partition type.
+      if (table.getPartitionInfo(table.getTableName()) != null &&
+          !table.isHivePartitionTable() &&
+          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
+        throw new InvalidLoadOptionException("Don't support use global sort on "
+            + table.getPartitionInfo().getPartitionType() +  " partition table.");
+      }
+    }
+  }
+
+  private void validateGlobalSortPartitions(String globalSortPartitions)
+      throws InvalidLoadOptionException {
+    if (globalSortPartitions != null) {
+      try {
+        int num = Integer.parseInt(globalSortPartitions);
+        if (num <= 0) {
+          throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
+        }
+      } catch (NumberFormatException e) {
+        throw new InvalidLoadOptionException(e.getMessage());
+      }
+    }
+  }
+
+  private void validateAndSetColumnCompressor(CarbonLoadModel carbonLoadModel)
+      throws InvalidLoadOptionException {
+    try {
+      String columnCompressor = carbonLoadModel.getColumnCompressor();
+      if (StringUtils.isBlank(columnCompressor)) {
+        columnCompressor = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
+      }
+      // check and load compressor
+      CompressorFactory.getInstance().getCompressor(columnCompressor);
+      carbonLoadModel.setColumnCompressor(columnCompressor);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new InvalidLoadOptionException("Failed to load the compressor");
+    }
+  }
+
+  /**
+   * check whether using default value or not
+   */
+  private String checkDefaultValue(String value, String defaultValue) {
+    if (StringUtils.isEmpty(value)) {
+      return defaultValue;
+    } else {
+      return value;
+    }
+  }
+
+  private void validateAndSetLoadMinSize(CarbonLoadModel carbonLoadModel) {
+    int size = 0;
+    String loadMinSize = carbonLoadModel.getLoadMinSize();
+    try {
+      size = Integer.parseInt(loadMinSize);
+    } catch (Exception e) {
+      size = 0;
+    }
+    // if the value is negative, set the value is 0
+    if (size > 0) {
+      carbonLoadModel.setLoadMinSize(loadMinSize);
+    } else {
+      carbonLoadModel.setLoadMinSize(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index a1dee27..b53976a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -186,8 +186,8 @@ public class LoadOption {
     optionsFinal.put("sort_scope", "local_sort");
     optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds", ""));
     optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
-        Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
-            CarbonCommonConstants.CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT));
+        Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
+            CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT));
     return optionsFinal;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2c517e3/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 98d3576..f1e1d9e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -81,7 +81,7 @@ public final class CarbonLoaderUtil {
   public enum BlockAssignmentStrategy {
     BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
     BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks"),
-    NODE_MIN_SIZE_FIRST("Assign blocks to node base on minumun size of inputs");
+    NODE_MIN_SIZE_FIRST("Assign blocks to node base on minimum size of inputs");
     private String name;
     BlockAssignmentStrategy(String name) {
       this.name = name;
@@ -546,7 +546,7 @@ public final class CarbonLoaderUtil {
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
    * @param blockAssignmentStrategy strategy used to assign blocks
-   * @param loadMinSize the property load_min_size_inmb specified by the user
+   * @param expectedMinSizePerNode the property load_min_size_inmb specified by the user
    * @return a map that maps node to blocks
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(