You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:17 UTC

[12/42] carbondata git commit: Added batch sort to load options and added test cases

Added batch sort to load options and added test cases

Added sort_scope to load options

rebase

rebase


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d734f530
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d734f530
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d734f530

Branch: refs/heads/branch-1.1
Commit: d734f53006308a675af30acefa798c814ada3329
Parents: 211c23b
Author: ravipesala <ra...@gmail.com>
Authored: Thu May 11 23:54:30 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 12:56:20 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  10 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  16 +-
 .../dataload/TestBatchSortDataLoad.scala        | 230 +++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  11 +-
 .../execution/command/carbonTableSchema.scala   |   4 +
 .../execution/command/carbonTableSchema.scala   |   4 +
 .../DataLoadFailAllTypeSortTest.scala           |  27 ++-
 .../processing/model/CarbonLoadModel.java       |  30 ++-
 .../newflow/DataLoadProcessBuilder.java         |  12 +-
 .../newflow/sort/SortScopeOptions.java          |  63 +++++
 .../processing/newflow/sort/SorterFactory.java  |   7 +-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |   5 +-
 .../sortandgroupby/sortdata/SortParameters.java |  13 ++
 .../util/CarbonDataProcessorUtil.java           |  51 ++++
 14 files changed, 449 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 269a75f..e1f3e9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1104,15 +1104,15 @@ public final class CarbonCommonConstants {
   /**
    * Sorts the data in batches and writes the batch data to store with index file.
    */
-  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
+  public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope";
 
   /**
-   * If set to true, the sorting scope is smaller and more index tree will be created,
+   * If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created,
    * thus loading is faster but query maybe slower.
-   * If set to false, the sorting scope is bigger and one index tree per data node will be created,
-   * thus loading is slower but query is faster.
+   * If set to LOCAL_SORT, the sorting scope is bigger and one index tree per data node will be
+   * created, thus loading is slower but query is faster.
    */
-  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "false";
+  public static final String LOAD_SORT_SCOPE_DEFAULT = "LOCAL_SORT";
 
   /**
    * Size of batch data to keep in memory, as a thumb rule it supposed

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..08661a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.internal.index.Block;
@@ -84,7 +85,11 @@ public class CarbonInputSplit extends FileSplit
       ColumnarFormatVersion version) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
-    this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    if (taskNo.contains("_")) {
+      taskNo = taskNo.split("_")[0];
+    }
+    this.taskId = taskNo;
     this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
     this.invalidSegments = new ArrayList<>();
     this.version = version;
@@ -237,10 +242,11 @@ public class CarbonInputSplit extends FileSplit
     String filePath1 = this.getPath().getName();
     String filePath2 = other.getPath().getName();
     if (CarbonTablePath.isCarbonDataFile(filePath1)) {
-      int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
-      int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
-      if (firstTaskId != otherTaskId) {
-        return firstTaskId - otherTaskId;
+      byte[] firstTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath1).getBytes();
+      byte[] otherTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath2).getBytes();
+      int compare = ByteUtil.compare(firstTaskId, otherTaskId);
+      if (compare != 0) {
+        return compare;
       }
 
       int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
new file mode 100644
index 0000000..70007c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import java.io.{BufferedWriter, File, FileWriter, FilenameFilter}
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.Row
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
+  var filePath: String = _
+
+
+  def buildTestData() = {
+    filePath = s"${integrationPath}/spark-common-test/target/big.csv"
+    val file = new File(filePath)
+    val writer = new BufferedWriter(new FileWriter(file))
+    writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10")
+    writer.newLine()
+    for(i <- 0 until 200000) {
+      writer.write("a" + i%1000 + "," +
+                   "b" + i%1000 + "," +
+                   "c" + i%1000 + "," +
+                   "d" + i%1000 + "," +
+                   "e" + i%1000 + "," +
+                   "f" + i%1000 + "," +
+                   i%1000 + "," +
+                   i%1000 + "," +
+                   i%1000 + "," +
+                   i%1000 + "\n")
+      if ( i % 10000 == 0) {
+        writer.flush()
+      }
+    }
+    writer.close()
+  }
+
+  def dropTable() = {
+    sql("DROP TABLE IF EXISTS carbon_load1")
+    sql("DROP TABLE IF EXISTS carbon_load2")
+    sql("DROP TABLE IF EXISTS carbon_load3")
+    sql("DROP TABLE IF EXISTS carbon_load4")
+    sql("DROP TABLE IF EXISTS carbon_load5")
+    sql("DROP TABLE IF EXISTS carbon_load6")
+  }
+
+
+
+  override def beforeAll {
+    dropTable
+    buildTestData
+  }
+
+
+
+  test("test batch sort load by passing option to load command") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load1(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000)))
+
+    assert(getIndexfileCount("carbon_load1") == 12, "Something wrong in batch sort")
+  }
+
+  test("test batch sort load by passing option to load command and compare with normal load") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load2(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load2 ")
+
+    checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+      sql("select * from carbon_load2 where c1='a1' order by c1"))
+
+  }
+
+  test("test batch sort load by passing option and compaction") {
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+    sql("alter table carbon_load1 compact 'major'")
+    Thread.sleep(4000)
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000)))
+
+    assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in compaction after batch sort")
+
+  }
+
+  test("test batch sort load by passing option in one load and with out option in other load and then do compaction") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load5(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
+
+    checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000)))
+
+    checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+      sql("select * from carbon_load5 where c1='a1' order by c1"))
+
+    sql("alter table carbon_load5 compact 'major'")
+    Thread.sleep(4000)
+
+    assert(getIndexfileCount("carbon_load5", "0.1") == 1,
+      "Something wrong in compaction after batch sort")
+
+    checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+      sql("select * from carbon_load5 where c1='a1' order by c1"))
+
+  }
+
+  test("test batch sort load by passing option with single pass") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load3(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " +
+        s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 'single_pass'='true')")
+
+    checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000)))
+
+    assert(getIndexfileCount("carbon_load3") == 12, "Something wrong in batch sort")
+
+    checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"),
+      sql("select * from carbon_load2 where c1='a1' order by c1"))
+
+  }
+
+  test("test batch sort load by with out passing option but through carbon properties") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "1")
+    sql(
+      """
+        | CREATE TABLE carbon_load4(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " )
+
+    checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000)))
+
+    assert(getIndexfileCount("carbon_load4") == 12, "Something wrong in batch sort")
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0")
+  }
+
+  test("test batch sort load by with out passing option but through carbon properties with default size") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
+    sql(
+      """
+        | CREATE TABLE carbon_load6(c1 string, c2 string, c3 string, c4 string, c5 string,
+        | c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " )
+
+    checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000)))
+
+    assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch sort")
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+  }
+
+  def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
+    val store  = storeLocation +"/default/"+ tableName + "/Fact/Part0/Segment_"+segmentNo
+    val list = new File(store).list(new FilenameFilter {
+      override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+    })
+    list.size
+  }
+
+  override def afterAll {
+    dropTable
+    new File(filePath).delete()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index afc4a58..a701c72 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.processing.constants.LoggerAction
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -753,7 +754,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT",
-      "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD"
+      "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder
@@ -808,6 +809,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       }
     }
 
+    if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) {
+      val optionValue: String = options.get("sort_scope").get.head._2
+      if (!SortScopeOptions.isValidSortOption(optionValue)) {
+        throw new MalformedCarbonCommandException(
+          "option SORT_SCOPE can have option either BATCH_SORT or LOCAL_SORT or GLOBAL_SORT")
+      }
+    }
+
     // check for duplicate options
     val duplicateOptions = options filter {
       case (_, optionlist) => optionlist.size > 1

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1192e08..494beff 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -405,6 +405,8 @@ case class LoadTable(
       val dateFormat = options.getOrElse("dateformat", null)
       validateDateFormat(dateFormat, table)
       val maxColumns = options.getOrElse("maxcolumns", null)
+      val sortScope = options.getOrElse("sort_scope", null)
+      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
 
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
@@ -428,6 +430,8 @@ case class LoadTable(
       carbonLoadModel
         .setIsEmptyDataBadRecord(
           DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
+      carbonLoadModel.setSortScope(sortScope)
+      carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
       // when single_pass=true, and not use all dict
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index e2405f2..09824d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -417,6 +417,8 @@ case class LoadTable(
       val dateFormat = options.getOrElse("dateformat", null)
       validateDateFormat(dateFormat, table)
       val maxColumns = options.getOrElse("maxcolumns", null)
+      val sortScope = options.getOrElse("sort_scope", null)
+      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
@@ -439,6 +441,8 @@ case class LoadTable(
       carbonLoadModel
         .setIsEmptyDataBadRecord(
           DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
+      carbonLoadModel.setSortScope(sortScope)
+      carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>
           true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 0465aa7..5e91574 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -116,9 +116,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "batch_sort")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
       sql("create table data_bm(name String, dob long, weight int) " +
           "STORED BY 'org.apache.carbondata.format'")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
@@ -132,7 +132,8 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
     }
     finally {
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
@@ -148,9 +149,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE");
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
       sql("create table data_bmf(name String, dob long, weight int) " +
           "STORED BY 'org.apache.carbondata.format'")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
@@ -166,10 +167,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
     }
     finally {
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
 
@@ -182,7 +184,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
       sql("create table data_bm_no_good_data(name String, dob long, weight int) " +
           "STORED BY 'org.apache.carbondata.format'")
       val testData = s"$resourcesPath/badrecords/dummy2.csv"
@@ -198,10 +200,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
     }
     finally {
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+        .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
 
@@ -214,7 +217,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
       CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
       sql("create table data_tbm(name String, dob long, weight int) " +
           "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " +
           "'bucketcolumns'='name', 'tableName'='data_tbm')")
@@ -232,7 +235,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
     finally {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index d8f84bf..3a2e2eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -171,7 +171,15 @@ public class CarbonLoadModel implements Serializable {
    */
   private boolean preFetch;
 
-  private String numberOfcolumns;
+  /**
+   * Batch sort should be enabled or not
+   */
+  private String sortScope;
+
+  /**
+   * Batch sort size in mb.
+   */
+  private String batchSortSizeInMb;
   /**
    * get escape char
    *
@@ -391,6 +399,8 @@ public class CarbonLoadModel implements Serializable {
     copy.dictionaryServerPort = dictionaryServerPort;
     copy.preFetch = preFetch;
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copy.sortScope = sortScope;
+    copy.batchSortSizeInMb = batchSortSizeInMb;
     return copy;
   }
 
@@ -442,6 +452,8 @@ public class CarbonLoadModel implements Serializable {
     copyObj.dictionaryServerPort = dictionaryServerPort;
     copyObj.preFetch = preFetch;
     copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copyObj.sortScope = sortScope;
+    copyObj.batchSortSizeInMb = batchSortSizeInMb;
     return copyObj;
   }
 
@@ -773,4 +785,20 @@ public class CarbonLoadModel implements Serializable {
   public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
     this.isEmptyDataBadRecord = isEmptyDataBadRecord;
   }
+
+  public String getSortScope() {
+    return sortScope;
+  }
+
+  public void setSortScope(String sortScope) {
+    this.sortScope = sortScope;
+  }
+
+  public String getBatchSortSizeInMb() {
+    return batchSortSizeInMb;
+  }
+
+  public void setBatchSortSizeInMb(String batchSortSizeInMb) {
+    this.batchSortSizeInMb = batchSortSizeInMb;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 8865518..5c7c035 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
@@ -53,14 +55,12 @@ public final class DataLoadProcessBuilder {
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
       CarbonIterator[] inputIterators) throws Exception {
-    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
-            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
+    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
     if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
-    } else if (batchSort) {
+    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
       return buildInternalForBatchSort(inputIterators, configuration);
     } else {
       return buildInternal(inputIterators, configuration);
@@ -158,6 +158,10 @@ public final class DataLoadProcessBuilder {
         loadModel.getIsEmptyDataBadRecord().split(",")[1]);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
         loadModel.getFactFilePath());
+    configuration
+        .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
+    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        loadModel.getBatchSortSizeInMb());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
new file mode 100644
index 0000000..f2534db
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Sort scope options
+ */
+public class SortScopeOptions {
+
+  public static SortScope getSortScope(String sortScope) {
+    if (sortScope == null) {
+      sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
+    }
+    switch (sortScope.toUpperCase()) {
+      case "BATCH_SORT":
+        return SortScope.BATCH_SORT;
+      case "LOCAL_SORT":
+        return SortScope.LOCAL_SORT;
+      case "NO_SORT":
+        return SortScope.NO_SORT;
+      default:
+        return SortScope.LOCAL_SORT;
+    }
+  }
+
+  public static boolean isValidSortOption(String sortScope) {
+    if (sortScope == null) {
+      return false;
+    }
+    switch (sortScope.toUpperCase()) {
+      case "BATCH_SORT":
+        return true;
+      case "LOCAL_SORT":
+        return true;
+      case "NO_SORT":
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  public enum SortScope {
+    NO_SORT, BATCH_SORT, LOCAL_SORT;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
index 60cca69..39a21ad 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorte
 import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
 import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
 import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class SorterFactory {
 
@@ -39,9 +40,7 @@ public class SorterFactory {
     boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
             CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
-    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
-            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
+    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
     Sorter sorter;
     if (offheapsort) {
       if (configuration.getBucketingInfo() != null) {
@@ -58,7 +57,7 @@ public class SorterFactory {
         sorter = new ParallelReadMergeSorterImpl(counter);
       }
     }
-    if (batchSort) {
+    if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
       if (configuration.getBucketingInfo() == null) {
         sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index df3825a..898b73d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -98,9 +98,10 @@ public class UnsafeSortDataRows {
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
             CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
 
-    this.maxSizeAllowed = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+    this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
     if (maxSizeAllowed <= 0) {
+      // If user does not input any memory size, then take half the size of usable memory configured
+      // in sort memory size.
       this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
     } else {
       this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 3c3a9d8..07149f7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -114,6 +114,8 @@ public class SortParameters {
 
   private int numberOfCores;
 
+  private int batchSortSizeinMb;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -138,6 +140,7 @@ public class SortParameters {
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
     parameters.numberOfCores = numberOfCores;
+    parameters.batchSortSizeinMb = batchSortSizeinMb;
     return parameters;
   }
 
@@ -317,6 +320,14 @@ public class SortParameters {
     this.numberOfCores = numberOfCores;
   }
 
+  public int getBatchSortSizeinMb() {
+    return batchSortSizeinMb;
+  }
+
+  public void setBatchSortSizeinMb(int batchSortSizeinMb) {
+    this.batchSortSizeinMb = batchSortSizeinMb;
+  }
+
   public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -334,6 +345,8 @@ public class SortParameters {
     parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
+
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 41bfbed..a4de24e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -56,8 +56,10 @@ import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -522,4 +524,53 @@ public final class CarbonDataProcessorUtil {
     return aggType;
   }
 
+  /**
+   * Check whether batch sort is enabled or not.
+   * @param configuration
+   * @return
+   */
+  public static SortScopeOptions.SortScope getSortScope(CarbonDataLoadConfiguration configuration) {
+    SortScopeOptions.SortScope sortScope;
+    try {
+      // first check whether user input it from ddl, otherwise get from carbon properties
+      if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) == null) {
+        sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
+      } else {
+        sortScope = SortScopeOptions.getSortScope(
+            configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
+                .toString());
+      }
+    } catch (Exception e) {
+      sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
+      LOGGER.warn("sort scope is set to " + sortScope);
+    }
+    return sortScope;
+  }
+
+  /**
+   * Get the batch sort size
+   * @param configuration
+   * @return
+   */
+  public static int getBatchSortSizeinMb(CarbonDataLoadConfiguration configuration) {
+    int batchSortSizeInMb;
+    try {
+      // First try get from user input from ddl , otherwise get from carbon properties.
+      if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
+          == null) {
+        batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+      } else {
+        batchSortSizeInMb = Integer.parseInt(
+            configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
+                .toString());
+      }
+    } catch (Exception e) {
+      batchSortSizeInMb = 0;
+    }
+    return batchSortSizeInMb;
+  }
+
 }
\ No newline at end of file