You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/28 14:14:43 UTC

[44/49] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

Enhance data loading performance by specifying sort column bounds
1. Add row range number during convert-process-step
2. Dispatch rows to each sorter by range number
3. Sort/Write process step can be done concurrently in each range
4. Since all sorttemp files will be written in one folders, we add range
number to the file name to distingush them

Tests added and docs updated

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2414fb0c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2414fb0c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2414fb0c

Branch: refs/heads/carbonstore-rebase5
Commit: 2414fb0c09c103a3d8eac75d46ff33b35f5df6d7
Parents: a44a1f4
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Feb 13 10:58:06 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Feb 28 22:05:23 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../core/datastore/row/CarbonRow.java           |  10 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   2 +-
 .../core/metadata/schema/BucketingInfo.java     |  24 +-
 .../core/metadata/schema/ColumnRangeInfo.java   |  29 ++
 .../metadata/schema/SortColumnRangeInfo.java    |  83 +++++
 docs/data-management-on-carbondata.md           |  11 +
 .../TestLoadDataWithSortColumnBounds.scala      | 348 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala           |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../loading/CarbonDataLoadConfiguration.java    |  11 +
 .../loading/DataLoadProcessBuilder.java         |  77 +++-
 .../loading/converter/RowConverter.java         |   2 +-
 .../converter/impl/RowConverterImpl.java        |   5 +
 .../loading/model/CarbonLoadModel.java          |  14 +
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java    |   1 +
 .../partition/impl/HashPartitionerImpl.java     |  10 +-
 .../partition/impl/RangePartitionerImpl.java    |  71 ++++
 .../partition/impl/RawRowComparator.java        |  63 ++++
 .../processing/loading/sort/SorterFactory.java  |  16 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 272 ---------------
 ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++++++++++++++
 ...arallelReadMergeSorterWithBucketingImpl.java | 263 --------------
 ...allelReadMergeSorterWithColumnRangeImpl.java | 293 ++++++++++++++++
 .../loading/sort/unsafe/UnsafeSortDataRows.java |   6 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   6 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  11 +-
 .../steps/DataConverterProcessorStepImpl.java   | 102 +++++-
 ...ConverterProcessorWithBucketingStepImpl.java | 161 ---------
 .../steps/DataWriterProcessorStepImpl.java      |  70 +++-
 .../SingleThreadFinalSortFilesMerger.java       |   3 +-
 .../processing/sort/sortdata/SortDataRows.java  |  11 +-
 .../sortdata/SortIntermediateFileMerger.java    |   6 +-
 .../sort/sortdata/SortParameters.java           |  10 +
 .../store/CarbonFactDataHandlerColumnar.java    |   6 +-
 39 files changed, 1558 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..8ff8dc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
       = "carbon.load.skewedDataOptimization.enabled";
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
+
+  /**
+   * field delimiter for each field in one bound
+   */
+  public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ",";
+
+  /**
+   * row delimiter for each sort column bounds
+   */
+  public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 8702421..bb624af 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -29,7 +29,7 @@ public class CarbonRow implements Serializable {
 
   private Object[] rawData;
 
-  public short bucketNumber;
+  private short rangeId;
 
   public CarbonRow(Object[] data) {
     this.data = data;
@@ -83,4 +83,12 @@ public class CarbonRow implements Serializable {
   public void setRawData(Object[] rawData) {
     this.rawData = rawData;
   }
+
+  public short getRangeId() {
+    return rangeId;
+  }
+
+  public void setRangeId(short rangeId) {
+    this.rangeId = rangeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 21ab797..a15804e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -293,7 +293,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema));
     }
     return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema,
-        bucketingInfo.getNumberOfBuckets());
+        bucketingInfo.getNumOfRanges());
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
index 569241d..e24f0f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
@@ -24,40 +24,41 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * Bucketing information
  */
-public class BucketingInfo implements Serializable, Writable {
-
+@InterfaceAudience.Internal
+public class BucketingInfo implements ColumnRangeInfo, Serializable, Writable {
   private static final long serialVersionUID = -0L;
-
   private List<ColumnSchema> listOfColumns;
-
-  private int numberOfBuckets;
+  // number of value ranges
+  private int numOfRanges;
 
   public BucketingInfo() {
 
   }
 
-  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfBuckets) {
+  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfRanges) {
     this.listOfColumns = listOfColumns;
-    this.numberOfBuckets = numberOfBuckets;
+    this.numOfRanges = numberOfRanges;
   }
 
   public List<ColumnSchema> getListOfColumns() {
     return listOfColumns;
   }
 
-  public int getNumberOfBuckets() {
-    return numberOfBuckets;
+  @Override
+  public int getNumOfRanges() {
+    return numOfRanges;
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
-    output.writeInt(numberOfBuckets);
+    output.writeInt(numOfRanges);
     output.writeInt(listOfColumns.size());
     for (ColumnSchema aColSchema : listOfColumns) {
       aColSchema.write(output);
@@ -66,7 +67,7 @@ public class BucketingInfo implements Serializable, Writable {
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    this.numberOfBuckets = input.readInt();
+    this.numOfRanges = input.readInt();
     int colSchemaSize = input.readInt();
     this.listOfColumns = new ArrayList<>(colSchemaSize);
     for (int i = 0; i < colSchemaSize; i++) {
@@ -75,5 +76,4 @@ public class BucketingInfo implements Serializable, Writable {
       this.listOfColumns.add(aSchema);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
new file mode 100644
index 0000000..c5454b2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * interface for column range information. Currently we treat bucket and sort_column_range as
+ * value ranges for a column.
+ */
+@InterfaceAudience.Internal
+public interface ColumnRangeInfo {
+  int getNumOfRanges();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
new file mode 100644
index 0000000..9d2460a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * column ranges specified by sort column bounds
+ */
+@InterfaceAudience.Internal
+public class SortColumnRangeInfo implements ColumnRangeInfo, Serializable {
+  private static final long serialVersionUID = 1L;
+  // indices for the sort columns in the raw row
+  private int[] sortColumnIndex;
+  // is the sort column no dictionary encoded
+  private boolean[] isSortColumnNoDict;
+  // each literal sort column bounds specified by user
+  private String[] userSpecifiedRanges;
+  // separator for the field values in each bound
+  private String separator;
+  // number of value ranges for the columns
+  private int numOfRanges;
+
+  public SortColumnRangeInfo(int[] sortColumnIndex, boolean[] isSortColumnNoDict,
+      String[] userSpecifiedRanges, String separator) {
+    this.sortColumnIndex = sortColumnIndex;
+    this.isSortColumnNoDict = isSortColumnNoDict;
+    this.userSpecifiedRanges = userSpecifiedRanges;
+    this.separator = separator;
+    this.numOfRanges = userSpecifiedRanges.length + 1;
+  }
+
+  public int[] getSortColumnIndex() {
+    return sortColumnIndex;
+  }
+
+  public boolean[] getIsSortColumnNoDict() {
+    return isSortColumnNoDict;
+  }
+
+  public String[] getUserSpecifiedRanges() {
+    return userSpecifiedRanges;
+  }
+
+  public String getSeparator() {
+    return separator;
+  }
+
+  @Override
+  public int getNumOfRanges() {
+    return numOfRanges;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("SortColumnRangeInfo{");
+    sb.append("sortColumnIndex=").append(Arrays.toString(sortColumnIndex));
+    sb.append(", isSortColumnNoDict=").append(Arrays.toString(isSortColumnNoDict));
+    sb.append(", userSpecifiedRanges=").append(Arrays.toString(userSpecifiedRanges));
+    sb.append(", separator='").append(separator).append('\'');
+    sb.append(", numOfRanges=").append(numOfRanges);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index 9678a32..e71e489 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -419,6 +419,17 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
     ```
     NOTE: Date formats are specified by date pattern strings. The date pattern letters in CarbonData are same as in JAVA. Refer to [SimpleDateFormat](http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html).
 
+  - **SORT COLUMN BOUNDS:** Range bounds for sort columns.
+
+    ```
+    OPTIONS('SORT_COLUMN_BOUNDS'='v11,v21,v31;v12,v22,v32;v13,v23,v33')
+    ```
+    NOTE:
+    * SORT_COLUMN_BOUNDS will be used only when the SORT_SCOPE is 'local_sort'.
+    * Each bound is separated by ';' and each field value in bound is separated by ','.
+    * Carbondata will use these bounds as ranges to process data concurrently.
+    * Since the actual order and literal order of the dictionary column are not necessarily the same, we do not recommend you to use this feature if the first sort column is 'dictionary_include'.
+
   - **SINGLE_PASS:** Single Pass Loading enables single job to finish data loading with dictionary generation on the fly. It enhances performance in the scenarios where the subsequent data loading after initial load involves fewer incremental updates on the dictionary.
 
   This option specifies whether to use single pass for loading data or not. By default this option is set to FALSE.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
new file mode 100644
index 0000000..1f171b8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.{File, FileOutputStream, OutputStreamWriter, Serializable}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+case class SortColumnBoundRow (id: Int, date: String, country: String, name: String,
+    phoneType: String, serialName: String, salary: Int) extends Serializable
+
+object TestLoadDataWithSortColumnBounds {
+  def generateOneRow(id : Int): SortColumnBoundRow = {
+    SortColumnBoundRow(id,
+      "2015/7/23",
+      s"country$id",
+      s"name$id",
+      s"phone${new Random().nextInt(10000)}",
+      s"ASD${new Random().nextInt(10000)}",
+      10000 + id)
+  }
+}
+
+class TestLoadDataWithSortColumnBounds extends QueryTest with BeforeAndAfterAll {
+  private val tableName: String = "test_table_with_sort_column_bounds"
+  private val filePath: String = s"$resourcesPath/source_for_sort_column_bounds.csv"
+  private var df: DataFrame = _
+
+  private val dateFormatStr: String = "yyyy/MM/dd"
+  private val totalLineNum = 2000
+
+  private val originDateStatus: String = CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, dateFormatStr)
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    prepareDataFile()
+    prepareDataFrame()
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, originDateStatus)
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    new File(filePath).delete()
+    df = null
+  }
+
+  /**
+   * generate loading files based on source.csv but can have more lines
+   */
+  private def prepareDataFile(): Unit = {
+    val file = new File(filePath)
+
+    val sb: StringBuilder = new StringBuilder
+    def generateLine(id : Int): String = {
+      sb.clear()
+      val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
+      sb.append(row.id).append(',')
+        .append(row.date).append(',')
+        .append(row.country).append(',')
+        .append(row.name).append(',')
+        .append(row.phoneType).append(',')
+        .append(row.serialName).append(',')
+        .append(row.salary)
+        .append(System.lineSeparator())
+        .toString()
+    }
+
+    val outputStream = new FileOutputStream(file)
+    val writer = new OutputStreamWriter(outputStream)
+    for (i <- 1 to totalLineNum) {
+      writer.write(generateLine(i))
+    }
+
+    writer.flush()
+    writer.close()
+    outputStream.flush()
+    outputStream.close()
+  }
+
+  /**
+   * prepare data frame
+   */
+  private def prepareDataFrame(): Unit = {
+    import sqlContext.implicits._
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to totalLineNum)
+      .map(id => {
+        val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
+        (row.id, row.date, row.country, row.name, row.phoneType, row.serialName, row.salary)
+      })
+      .toDF("ID", "date", "country", "name", "phoneType", "serialName", "salary")
+  }
+
+  test("load data with sort column bounds: safe mode") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // load with 4 bounds
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
+  }
+
+  test("load data with sort column bounds: unsafe mode") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // load with 4 bounds
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
+  }
+
+  test("load data with sort column bounds: empty column value in bounds is treated as null") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // bounds have empty value
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='200,aab1;,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column bounds will be ignored if it is empty.") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: number of column value in bounds should match that of sort column") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    val e = intercept[Exception] {
+      // number of column value does not match that of sort columns
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab1;800')")
+    }
+
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." +
+      " Expected 2, actual 1." +
+      " The illegal bound is '800'"))
+
+    val e2 = intercept[Exception] {
+      // number of column value does not match that of sort columns
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab1;800,aab1,def')")
+    }
+
+    assert(e2.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." +
+      " Expected 2, actual 3." +
+      " The illegal bound is '800,aab1,def'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column bounds will be ignored if not using local_sort") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata'" +
+        "tblproperties('sort_columns'='ID,name','sort_scope'='global_sort')")
+    // since the sort_scope is 'global_sort', we will ignore the sort column bounds,
+    // so the error in sort_column bounds will not be thrown
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab,extra_field')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: no sort columns explicitly specified" +
+       " means all dimension columns will be sort columns, so bounds should be set correctly") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata'")
+    // the sort_columns will have 5 columns if we don't specify it explicitly
+    val e = intercept[Exception] {
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab')")
+    }
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." +
+      " Expected 5, actual 2." +
+      " The illegal bound is '400,aab'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column is global dictionary encoded") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name','dictionary_include'='ID')")
+    // ID is sort column and dictionary column. Since the actual order and literal order of
+    // this column are not necessarily the same, this will not cause error but will cause data skewed.
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,name400;800,name800;1200,name1200;1600,name1600')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column is global dictionary encoded" +
+       " but bounds are not in dictionary") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='name,ID','dictionary_include'='name')")
+    // 'name' is sort column and dictionary column, but value for 'name' in bounds does not exists
+    // in dictionary. It will not cause error but will cause data skewed.
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='nmm400,400;nme800,800;nme1200,1200;nme1600,1600')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data frame with sort column bounds") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "false")
+      .option("sort_columns", "ID,name")
+      .option("sort_column_bounds", "600,aab1;1200,aab1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    sql(s"select count(*) from $tableName").show()
+    sql(s"select count(*) from $tableName where ID > 1001").show()
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data frame with sort column bounds: number of column value in bounds should match that of sort column") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    val e = intercept[Exception] {
+      df.write
+        .format("carbondata")
+        .option("tableName", tableName)
+        .option("tempCSV", "false")
+        .option("sort_columns", "ID,name")
+        .option("sort_column_bounds", "600,aab1;1200,aab1,def")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." +
+      " Expected 2, actual 3." +
+      " The illegal bound is '1200,aab1,def'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index ddb9b32..49a8023 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -163,7 +163,7 @@ class CarbonScanRDD(
         var i = 0
         val bucketed =
           splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
-        (0 until bucketedTable.getNumberOfBuckets).map { bucketId =>
+        (0 until bucketedTable.getNumOfRanges).map { bucketId =>
           val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
           val multiBlockSplit =
             new CarbonMultiBlockSplit(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index c73065d..3495885 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -50,7 +50,7 @@ object PartitionDropper {
     val bucketInfo = carbonTable.getBucketingInfo(tableName)
     val bucketNumber = bucketInfo match {
       case null => 1
-      case _ => bucketInfo.getNumberOfBuckets
+      case _ => bucketInfo.getNumOfRanges
     }
     val partitionIndex = oldPartitionIds.indexOf(Integer.valueOf(partitionId))
     val targetPartitionId = partitionInfo.getPartitionType match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index 9106cca..0d437f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -46,7 +46,7 @@ object PartitionSplitter {
      var finalSplitStatus = false
      val bucketNumber = bucketInfo match {
        case null => 1
-       case _ => bucketInfo.getNumberOfBuckets
+       case _ => bucketInfo.getNumOfRanges
      }
      val partitionInfo = carbonTable.getPartitionInfo(tableName)
      val partitioner = PartitionFactory.getPartitioner(partitionInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index b1e4083..54ac7dc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -881,7 +881,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
       "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
-      "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE"
+      "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE",
+      "SORT_COLUMN_BOUNDS"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 48679b1..f82d9c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -434,7 +434,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       val cols = info.getListOfColumns.asScala
       val sortColumn = carbonTable.
         getDimensionByTableName(carbonTable.getTableName).get(0).getColName
-      val numBuckets = info.getNumberOfBuckets
+      val numBuckets = info.getNumOfRanges
       val bucketColumns = cols.flatMap { n =>
         val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
         attrRef match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 895fb79..c21c57f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.loading.converter.DictionaryCardinalityFinder;
 
@@ -107,6 +108,8 @@ public class CarbonDataLoadConfiguration {
    */
   private short writingCoresCount;
 
+  private SortColumnRangeInfo sortColumnRangeInfo;
+
   /**
    * Flder path to where data should be written for this load.
    */
@@ -366,4 +369,12 @@ public class CarbonDataLoadConfiguration {
   public void setDataWritePath(String dataWritePath) {
     this.dataWritePath = dataWritePath;
   }
+
+  public SortColumnRangeInfo getSortColumnRangeInfo() {
+    return sortColumnRangeInfo;
+  }
+
+  public void setSortColumnRangeInfo(SortColumnRangeInfo sortColumnRangeInfo) {
+    this.sortColumnRangeInfo = sortColumnRangeInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index fc2796a..011eeae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -22,23 +22,26 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
-import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
@@ -52,6 +55,8 @@ import org.apache.commons.lang3.StringUtils;
  * It builds the pipe line of steps for loading data to carbon.
  */
 public final class DataLoadProcessBuilder {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation,
       CarbonIterator[] inputIterators) throws Exception {
@@ -150,7 +155,7 @@ public final class DataLoadProcessBuilder {
     // 2. Converts the data like dictionary or non dictionary or complex objects depends on
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
     // 3. Sorts the data by SortColumn or not
     AbstractDataLoadProcessorStep sortProcessorStep =
         new SortProcessorStepImpl(configuration, converterProcessorStep);
@@ -252,6 +257,7 @@ public final class DataLoadProcessBuilder {
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     configuration.setDataWritePath(loadModel.getDataWritePath());
+    setSortColumnInfo(carbonTable, loadModel, configuration);
     // For partition loading always use single core as it already runs in multiple
     // threads per partition
     if (carbonTable.isHivePartitionTable()) {
@@ -262,4 +268,71 @@ public final class DataLoadProcessBuilder {
     return configuration;
   }
 
+  /**
+   * set sort column info in configuration
+   * @param carbonTable carbon table
+   * @param loadModel load model
+   * @param configuration configuration
+   */
+  private static void setSortColumnInfo(CarbonTable carbonTable, CarbonLoadModel loadModel,
+      CarbonDataLoadConfiguration configuration) {
+    List<String> sortCols = carbonTable.getSortColumns(carbonTable.getTableName());
+    SortScopeOptions.SortScope sortScope = SortScopeOptions.getSortScope(loadModel.getSortScope());
+    if (!SortScopeOptions.SortScope.LOCAL_SORT.equals(sortScope)
+        || sortCols.size() == 0
+        || StringUtils.isBlank(loadModel.getSortColumnsBoundsStr())) {
+      if (!StringUtils.isBlank(loadModel.getSortColumnsBoundsStr())) {
+        LOGGER.warn("sort column bounds will be ignored");
+      }
+
+      configuration.setSortColumnRangeInfo(null);
+      return;
+    }
+    // column index for sort columns
+    int[] sortColIndex = new int[sortCols.size()];
+    boolean[] isSortColNoDict = new boolean[sortCols.size()];
+
+    DataField[] outFields = configuration.getDataFields();
+    int j = 0;
+    boolean columnExist;
+    for (String sortCol : sortCols) {
+      columnExist = false;
+
+      for (int i = 0; !columnExist && i < outFields.length; i++) {
+        if (outFields[i].getColumn().getColName().equalsIgnoreCase(sortCol)) {
+          columnExist = true;
+
+          sortColIndex[j] = i;
+          isSortColNoDict[j] = !outFields[i].hasDictionaryEncoding();
+          j++;
+        }
+      }
+
+      if (!columnExist) {
+        throw new CarbonDataLoadingException("Field " + sortCol + " does not exist.");
+      }
+    }
+
+    String[] sortColumnBounds = StringUtils.splitPreserveAllTokens(
+        loadModel.getSortColumnsBoundsStr(),
+        CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_ROW_DELIMITER, -1);
+    for (String bound : sortColumnBounds) {
+      String[] fieldInBounds = StringUtils.splitPreserveAllTokens(bound,
+          CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER, -1);
+      if (fieldInBounds.length != sortCols.size()) {
+        String msg = new StringBuilder(
+            "The number of field in bounds should be equal to that in sort columns.")
+            .append(" Expected ").append(sortCols.size())
+            .append(", actual ").append(String.valueOf(fieldInBounds.length)).append(".")
+            .append(" The illegal bound is '").append(bound).append("'.").toString();
+        throw new CarbonDataLoadingException(msg);
+      }
+    }
+
+    SortColumnRangeInfo sortColumnRangeInfo = new SortColumnRangeInfo(sortColIndex,
+        isSortColNoDict,
+        sortColumnBounds,
+        CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER);
+    configuration.setSortColumnRangeInfo(sortColumnRangeInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
index fd3a650..016ff3f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
@@ -31,6 +31,6 @@ public interface RowConverter extends DictionaryCardinalityFinder {
   CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
 
   RowConverter createCopyForNewThread();
-
+  FieldConverter[] getFieldConverters();
   void finish();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index c5313cb..208d42f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -250,4 +250,9 @@ public class RowConverterImpl implements RowConverter {
     }
     return cardinality;
   }
+
+  @Override
+  public FieldConverter[] getFieldConverters() {
+    return fieldConverters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 890492e..e516bc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -189,6 +189,10 @@ public class CarbonLoadModel implements Serializable {
   private String globalSortPartitions;
 
   private boolean isAggLoadRequest;
+  /**
+   * sort columns bounds
+   */
+  private String sortColumnsBoundsStr;
 
   /**
    * It directly writes data directly to nosort processor bypassing all other processors.
@@ -367,6 +371,14 @@ public class CarbonLoadModel implements Serializable {
     this.dictionaryServiceProvider = dictionaryServiceProvider;
   }
 
+  public String getSortColumnsBoundsStr() {
+    return sortColumnsBoundsStr;
+  }
+
+  public void setSortColumnsBoundsStr(String sortColumnsBoundsStr) {
+    this.sortColumnsBoundsStr = sortColumnsBoundsStr;
+  }
+
   /**
    * get copy with partition
    *
@@ -468,6 +480,7 @@ public class CarbonLoadModel implements Serializable {
     copy.isAggLoadRequest = isAggLoadRequest;
     copy.badRecordsLocation = badRecordsLocation;
     copy.isPartitionLoad = isPartitionLoad;
+    copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copy;
   }
 
@@ -524,6 +537,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.batchSortSizeInMb = batchSortSizeInMb;
     copyObj.badRecordsLocation = badRecordsLocation;
     copyObj.isAggLoadRequest = isAggLoadRequest;
+    copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copyObj;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 99684ad..17e8dbe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -223,6 +223,7 @@ public class CarbonLoadModelBuilder {
 
     carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
     carbonLoadModel.readAndSetLoadMetadataDetails();
+    carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index bd942ca..5af4859 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -191,6 +191,7 @@ public class LoadOption {
     }
 
     optionsFinal.put("single_pass", String.valueOf(singlePass));
+    optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds", ""));
     return optionsFinal;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index f24d24f..e10faf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.partition.impl;
 
 import java.util.List;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -27,7 +29,8 @@ import org.apache.carbondata.processing.loading.partition.Partitioner;
 /**
  * Hash partitioner implementation
  */
-public class HashPartitionerImpl implements Partitioner<Object[]> {
+@InterfaceAudience.Internal
+public class HashPartitionerImpl implements Partitioner<CarbonRow> {
 
   private int numberOfBuckets;
 
@@ -50,10 +53,11 @@ public class HashPartitionerImpl implements Partitioner<Object[]> {
     }
   }
 
-  @Override public int getPartition(Object[] objects) {
+  @Override
+  public int getPartition(CarbonRow key) {
     int hashCode = 0;
     for (Hash hash : hashes) {
-      hashCode += hash.getHash(objects);
+      hashCode += hash.getHash(key.getData());
     }
     return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
new file mode 100644
index 0000000..d59ad02
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+
+@InterfaceAudience.Internal
+public class RangePartitionerImpl implements Partitioner<CarbonRow> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RangePartitionerImpl.class.getName());
+  private CarbonRow[] rangeBounds;
+  private Comparator<CarbonRow> comparator;
+
+  public RangePartitionerImpl(CarbonRow[] rangeBounds, Comparator<CarbonRow> comparator) {
+    this.rangeBounds = rangeBounds;
+    LOGGER.info("Use range partitioner to distribute data to "
+        + (rangeBounds.length + 1) + " ranges.");
+    this.comparator = comparator;
+  }
+
+  /**
+   * learned from spark org.apache.spark.RangePartitioner
+   *
+   * @param key key
+   * @return partitionId
+   */
+  @Override
+  public int getPartition(CarbonRow key) {
+    int partition = 0;
+    if (rangeBounds.length <= 128) {
+      // If we have less than 128 partitions naive search
+      while (partition < rangeBounds.length
+          && comparator.compare(key, rangeBounds[partition]) > 0) {
+        partition += 1;
+      }
+    } else {
+      // binary search. binarySearch either returns the match location or -[insertion point]-1
+      partition = Arrays.binarySearch(rangeBounds, 0, rangeBounds.length, key, comparator);
+      if (partition < 0) {
+        partition = -partition - 1;
+      }
+      if (partition > rangeBounds.length) {
+        partition = rangeBounds.length;
+      }
+    }
+
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
new file mode 100644
index 0000000..64b64f5
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+/**
+ * comparator for the converted row. The row has not been rearranged as 3-parted yet.
+ */
+@InterfaceAudience.Internal
+public class RawRowComparator implements Comparator<CarbonRow> {
+  private int[] sortColumnIndices;
+  private boolean[] isSortColumnNoDict;
+
+  public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict) {
+    this.sortColumnIndices = sortColumnIndices;
+    this.isSortColumnNoDict = isSortColumnNoDict;
+  }
+
+  @Override
+  public int compare(CarbonRow o1, CarbonRow o2) {
+    int diff = 0;
+    int i = 0;
+    for (int colIdx : sortColumnIndices) {
+      if (isSortColumnNoDict[i]) {
+        byte[] colA = (byte[]) o1.getObject(colIdx);
+        byte[] colB = (byte[]) o2.getObject(colIdx);
+        diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
+        if (diff != 0) {
+          return diff;
+        }
+      } else {
+        int colA = (int) o1.getObject(colIdx);
+        int colB = (int) o2.getObject(colIdx);
+        diff = colA - colB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+      i++;
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
index a8f0282..b74b393 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
@@ -25,10 +25,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithColumnRangeImpl;
 import org.apache.carbondata.processing.loading.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
 import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithColumnRangeImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class SorterFactory {
@@ -44,15 +44,21 @@ public class SorterFactory {
     Sorter sorter;
     if (offheapsort) {
       if (configuration.getBucketingInfo() != null) {
-        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+        sorter = new UnsafeParallelReadMergeSorterWithColumnRangeImpl(counter,
             configuration.getBucketingInfo());
+      } else if (configuration.getSortColumnRangeInfo() != null) {
+        sorter = new UnsafeParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getSortColumnRangeInfo());
       } else {
         sorter = new UnsafeParallelReadMergeSorterImpl(counter);
       }
     } else {
       if (configuration.getBucketingInfo() != null) {
-        sorter =
-            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
+        sorter = new ParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getBucketingInfo());
+      } else if (configuration.getSortColumnRangeInfo() != null) {
+        sorter = new ParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getSortColumnRangeInfo());
       } else {
         sorter = new ParallelReadMergeSorterImpl(counter);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index b7452a7..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private SortIntermediateFileMerger[] intermediateFileMergers;
-
-  private BucketingInfo bucketingInfo;
-
-  private int sortBufferSize;
-
-  private AtomicLong rowCounter;
-
-  public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
-      BucketingInfo bucketingInfo) {
-    this.rowCounter = rowCounter;
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-    int buffer = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
-    sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
-    if (sortBufferSize < 100) {
-      sortBufferSize = 100;
-    }
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
-    intermediateFileMergers =
-        new SortIntermediateFileMerger[sortDataRows.length];
-    try {
-      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-        SortParameters parameters = sortParameters.getCopy();
-        parameters.setPartitionID(i + "");
-        setTempLocation(parameters);
-        parameters.setBufferSize(sortBufferSize);
-        intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
-        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
-        sortDataRows[i].initialize();
-      }
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
-            this.threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRows, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      for (int i = 0; i < intermediateFileMergers.length; i++) {
-        intermediateFileMergers[i].finish();
-      }
-    } catch (CarbonDataWriterException e) {
-      throw new CarbonDataLoadingException(e);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
-    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
-    }
-
-    return batchIterator;
-  }
-
-  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        sortParameters.getDatabaseName(), sortParameters.getTableName(),
-        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
-        false, false);
-    // Set the data file location
-    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
-            sortParameters);
-  }
-
-  @Override public void close() {
-    for (int i = 0; i < intermediateFileMergers.length; i++) {
-      intermediateFileMergers[i].close();
-    }
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      for (int i = 0; i < sortDataRows.length; i++) {
-        // start sorting
-        sortDataRows[i].startSorting();
-      }
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
-        parameters.getSegmentId(), false, false);
-    String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(tmpLocs);
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private SortDataRows[] sortDataRows;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
-        AtomicLong rowCounter, ThreadStatusObserver observer) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.rowCounter = rowCounter;
-      this.threadStatusObserver = observer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
-              synchronized (sortDataRow) {
-                sortDataRow.addRow(row.getData());
-                rowCounter.getAndAdd(1);
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-
-  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private String partitionId;
-
-    private int batchSize;
-
-    private boolean firstRow = true;
-
-    public MergedDataIterator(String partitionId, int batchSize) {
-      this.partitionId = partitionId;
-      this.batchSize = batchSize;
-    }
-
-    private SingleThreadFinalSortFilesMerger finalMerger;
-
-    @Override public boolean hasNext() {
-      if (firstRow) {
-        firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
-        finalMerger.startFinalMerge();
-      }
-      return finalMerger.hasNext();
-    }
-
-    @Override public CarbonRowBatch next() {
-      int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-      while (finalMerger.hasNext() && counter < batchSize) {
-        rowBatch.addRow(new CarbonRow(finalMerger.next()));
-        counter++;
-      }
-      return rowBatch;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
new file mode 100644
index 0000000..808952b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for the data loading with specifying column value range, such as
+ * bucketing,sort_column_bounds, it sorts each range of data separately and write to temp files.
+ */
+public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSorter {
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      ParallelReadMergeSorterWithColumnRangeImpl.class.getName());
+
+  private SortParameters originSortParameters;
+
+  private SortIntermediateFileMerger[] intermediateFileMergers;
+
+  private ColumnRangeInfo columnRangeInfo;
+
+  private int sortBufferSize;
+
+  private AtomicLong rowCounter;
+  /**
+   * counters to collect information about rows processed by each range
+   */
+  private List<AtomicLong> insideRowCounterList;
+
+  public ParallelReadMergeSorterWithColumnRangeImpl(AtomicLong rowCounter,
+      ColumnRangeInfo columnRangeInfo) {
+    this.rowCounter = rowCounter;
+    this.columnRangeInfo = columnRangeInfo;
+  }
+
+  @Override
+  public void initialize(SortParameters sortParameters) {
+    this.originSortParameters = sortParameters;
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+    sortBufferSize = buffer / columnRangeInfo.getNumOfRanges();
+    if (sortBufferSize < 100) {
+      sortBufferSize = 100;
+    }
+    this.insideRowCounterList = new ArrayList<>(columnRangeInfo.getNumOfRanges());
+    for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+      insideRowCounterList.add(new AtomicLong(0));
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new SortDataRows[columnRangeInfo.getNumOfRanges()];
+    intermediateFileMergers = new SortIntermediateFileMerger[columnRangeInfo.getNumOfRanges()];
+    SortParameters[] sortParameterArray = new SortParameters[columnRangeInfo.getNumOfRanges()];
+    try {
+      for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+        SortParameters parameters = originSortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        parameters.setRangeId(i);
+        sortParameterArray[i] = parameters;
+        setTempLocation(parameters);
+        parameters.setBufferSize(sortBufferSize);
+        intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
+        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      // dispatch rows to sortDataRows by range id
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+            this.insideRowCounterList, this.threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, originSortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[columnRangeInfo.getNumOfRanges()];
+    for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+      batchIterator[i] = new MergedDataIterator(sortParameterArray[i], batchSize);
+    }
+
+    return batchIterator;
+  }
+
+  private SingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()),
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
+        sortParameters);
+  }
+
+  @Override public void close() {
+    for (int i = 0; i < intermediateFileMergers.length; i++) {
+      intermediateFileMergers[i].close();
+    }
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(),
+            parameters.getSegmentId(), false, false);
+    String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    parameters.setTempFileLocation(tmpLocs);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows[] sortDataRows;
+
+    private AtomicLong rowCounter;
+    private List<AtomicLong> insideCounterList;
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
+        AtomicLong rowCounter, List<AtomicLong> insideCounterList,
+        ThreadStatusObserver observer) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.rowCounter = rowCounter;
+      this.insideCounterList = insideCounterList;
+      this.threadStatusObserver = observer;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              SortDataRows sortDataRow = sortDataRows[row.getRangeId()];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+                insideCounterList.get(row.getRangeId()).getAndIncrement();
+                rowCounter.getAndAdd(1);
+              }
+            }
+          }
+        }
+        LOGGER.info("Rows processed by each range: " + insideCounterList);
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private SortParameters sortParameters;
+
+    private int batchSize;
+
+    private boolean firstRow = true;
+
+    public MergedDataIterator(SortParameters sortParameters, int batchSize) {
+      this.sortParameters = sortParameters;
+      this.batchSize = batchSize;
+    }
+
+    private SingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(sortParameters);
+        finalMerger.startFinalMerge();
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}