You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/11 13:53:03 UTC

[13/50] carbondata git commit: Added unsafe sort for bucketing feature

Added unsafe sort for bucketing feature

Rebased


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f82b10b4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f82b10b4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f82b10b4

Branch: refs/heads/branch-1.1
Commit: f82b10b4189a82922b02d784effa0099681badf7
Parents: 061c3c2
Author: ravipesala <ra...@gmail.com>
Authored: Sun Mar 26 19:12:24 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Thu May 4 21:20:36 2017 +0800

----------------------------------------------------------------------
 .../bucketing/TableBucketingTestCase.scala      |  24 ++
 .../processing/newflow/sort/SorterFactory.java  |  73 +++++
 ...arallelReadMergeSorterWithBucketingImpl.java |  18 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   7 +-
 .../impl/UnsafeParallelReadMergeSorterImpl.java |   6 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 264 +++++++++++++++++++
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  36 ++-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |   5 +-
 ...ConverterProcessorWithBucketingStepImpl.java |   3 +-
 .../newflow/steps/SortProcessorStepImpl.java    |  25 +-
 10 files changed, 423 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 8d3eed7..2731812 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -45,6 +45,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS t7")
     sql("DROP TABLE IF EXISTS t8")
     sql("DROP TABLE IF EXISTS t9")
+    sql("DROP TABLE IF EXISTS t10")
   }
 
   test("test create table with buckets") {
@@ -66,6 +67,27 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test create table with buckets unsafe") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    sql(
+      """
+           CREATE TABLE t10
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t10")
+      """)
+    LoadTable(Some("default"), "t10", s"$resourcesPath/source.csv", Nil,
+      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t10")
+    if (table != null && table.getBucketingInfo("t10") != null) {
+      assert(true)
+    } else {
+      assert(false, "Bucketing info does not exist")
+    }
+  }
+
   test("must be unable to create if number of buckets is in negative number") {
     try {
       sql(
@@ -215,6 +237,8 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS t6")
     sql("DROP TABLE IF EXISTS t7")
     sql("DROP TABLE IF EXISTS t8")
+    sql("DROP TABLE IF EXISTS t9")
+    sql("DROP TABLE IF EXISTS t10")
     sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
new file mode 100644
index 0000000..60cca69
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+
+public class SorterFactory {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SorterFactory.class.getName());
+
+  public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
+    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
+    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
+            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
+    Sorter sorter;
+    if (offheapsort) {
+      if (configuration.getBucketingInfo() != null) {
+        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+            configuration.getBucketingInfo());
+      } else {
+        sorter = new UnsafeParallelReadMergeSorterImpl(counter);
+      }
+    } else {
+      if (configuration.getBucketingInfo() != null) {
+        sorter =
+            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
+      } else {
+        sorter = new ParallelReadMergeSorterImpl(counter);
+      }
+    }
+    if (batchSort) {
+      if (configuration.getBucketingInfo() == null) {
+        sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
+      } else {
+        LOGGER.warn(
+            "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
+                .getName());
+      }
+    }
+    return sorter;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e5af1c6..cb1b8fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -57,10 +57,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
 
   private SortParameters sortParameters;
 
-  private SortIntermediateFileMerger intermediateFileMerger;
-
   private ExecutorService executorService;
 
+  private SortIntermediateFileMerger[] intermediateFileMergers;
+
   private BucketingInfo bucketingInfo;
 
   private int sortBufferSize;
@@ -75,7 +75,6 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
 
   @Override public void initialize(SortParameters sortParameters) {
     this.sortParameters = sortParameters;
-    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
     int buffer = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
     sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
@@ -87,13 +86,16 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
       throws CarbonDataLoadingException {
     SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+    intermediateFileMergers =
+        new SortIntermediateFileMerger[sortDataRows.length];
     try {
       for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
         SortParameters parameters = sortParameters.getCopy();
         parameters.setPartitionID(i + "");
         setTempLocation(parameters);
         parameters.setBufferSize(sortBufferSize);
-        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMerger);
+        intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
+        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
         sortDataRows[i].initialize();
       }
     } catch (CarbonSortKeyAndGroupByException e) {
@@ -116,7 +118,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     }
     checkError();
     try {
-      intermediateFileMerger.finish();
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
     } catch (CarbonDataWriterException e) {
       throw new CarbonDataLoadingException(e);
     } catch (CarbonSortKeyAndGroupByException e) {
@@ -148,7 +152,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   }
 
   @Override public void close() {
-    intermediateFileMerger.close();
+    for (int i = 0; i < intermediateFileMergers.length; i++) {
+      intermediateFileMergers[i].close();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index a54410c..0c6fa27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -196,9 +196,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
     }
 
     private void createSortDataRows() {
-      this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+      int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+          sortParameters.getTempFileLocation());
       unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
-      sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+      sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
+          inMemoryChunkSizeInMB);
 
       try {
         sortDataRow.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 0caafec..503f92a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -78,13 +78,15 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
     // Set the data file location
     String dataFolderLocation =
         storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
-    finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+    finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+        sortParameters.getTempFileLocation());
   }
 
   @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
       throws CarbonDataLoadingException {
+    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
     UnsafeSortDataRows sortDataRow =
-        new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+        new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       sortDataRow.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..43abf66
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private ExecutorService executorService;
+
+  private BucketingInfo bucketingInfo;
+
+  private DataField[] inputDataFields;
+
+  public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+      BucketingInfo bucketingInfo) {
+    this.inputDataFields = inputDataFields;
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
+    UnsafeIntermediateMerger[] intermediateFileMergers =
+        new UnsafeIntermediateMerger[sortDataRows.length];
+    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
+    if (inMemoryChunkSizeInMB < 5) {
+      inMemoryChunkSizeInMB = 5;
+    }
+    try {
+      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+        SortParameters parameters = sortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        setTempLocation(parameters);
+        intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+        sortDataRows[i] =
+            new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.submit(new SortIteratorThread(iterators[i], sortDataRows));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, sortParameters);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+    for (int i = 0; i < sortDataRows.length; i++) {
+      batchIterator[i] =
+          new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+    }
+
+    return batchIterator;
+  }
+
+  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+    String storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), bucketId,
+            sortParameters.getSegmentId() + "", false);
+    // Set the data file location
+    String dataFolderLocation =
+        storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+  }
+
+  @Override public void close() {
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false);
+    parameters.setTempFileLocation(
+        carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+   */
+  private static class SortIteratorThread implements Callable<Void> {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private UnsafeSortDataRows[] sortDataRows;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        UnsafeSortDataRows[] sortDataRows) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+    }
+
+    @Override public Void call() throws CarbonDataLoadingException {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(e);
+      }
+      return null;
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private String partitionId;
+
+    private int batchSize;
+
+    private boolean firstRow;
+
+    private UnsafeIntermediateMerger intermediateMerger;
+
+    public MergedDataIterator(String partitionId, int batchSize,
+        UnsafeIntermediateMerger intermediateMerger) {
+      this.partitionId = partitionId;
+      this.batchSize = batchSize;
+      this.intermediateMerger = intermediateMerger;
+      this.firstRow = true;
+    }
+
+    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(partitionId);
+        List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+            intermediateMerger.getMergedPages());
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 3afd3b0..df3825a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -84,7 +84,7 @@ public class UnsafeSortDataRows {
   private Semaphore semaphore;
 
   public UnsafeSortDataRows(SortParameters parameters,
-      UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) {
+      UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
     this.parameters = parameters;
 
     this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
@@ -92,7 +92,7 @@ public class UnsafeSortDataRows {
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
 
-    this.inMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    this.inMemoryChunkSize = inMemoryChunkSize;
     this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024;
     enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
@@ -193,6 +193,38 @@ public class UnsafeSortDataRows {
   }
 
   /**
+   * This method will be used to add new row
+   */
+  public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    if (rowPage.canAdd()) {
+      rowPage.addRow(row);
+    } else {
+      try {
+        if (enableInMemoryIntermediateMerge) {
+          unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+        }
+        unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+        semaphore.acquire();
+        dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+        MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+        boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+        rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getDimColCount(), parameters.getMeasureColCount(),
+            parameters.getAggType(), memoryBlock,
+            saveToDisk);
+        rowPage.addRow(row);
+      } catch (Exception e) {
+        LOGGER.error(
+            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+        throw new CarbonSortKeyAndGroupByException(e);
+      }
+
+    }
+  }
+
+  /**
    * Below method will be used to start storing process This method will get
    * all the temp files present in sort temp folder then it will create the
    * record holder heap and then it will read first record from each file and

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 95a337a..cd6b321 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -82,7 +82,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
 
   private boolean isStopProcess;
 
-  public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) {
+  public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
+      String tempFileLocation) {
     this.parameters = parameters;
     // set measure and dimension count
     this.measureCount = parameters.getMeasureColCount();
@@ -91,7 +92,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
 
     this.noDictionaryCount = parameters.getNoDictionaryCount();
     this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-    this.tempFileLocation = parameters.getTempFileLocation();
+    this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 86971c3..78df028 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -128,8 +128,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
     CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
     while (rowBatch.hasNext()) {
       CarbonRow next = rowBatch.next();
+      short bucketNumber = (short) partitioner.getPartition(next.getData());
       CarbonRow convertRow = localConverter.convert(next);
-      convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
+      convertRow.bucketNumber = bucketNumber;
       newBatch.addRow(convertRow);
     }
     rowCounter.getAndAdd(newBatch.getSize());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index 17cc01e..698459c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.processing.newflow.steps;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -28,10 +26,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.newflow.sort.Sorter;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.SorterFactory;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 
 /**
@@ -56,23 +51,7 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
   public void initialize() throws IOException {
     child.initialize();
     SortParameters sortParameters = SortParameters.createSortParameters(configuration);
-    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
-    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
-            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
-    if (batchSort) {
-      sorter = new UnsafeBatchParallelReadMergeSorterImpl(rowCounter);
-    } else if (offheapsort) {
-      sorter = new UnsafeParallelReadMergeSorterImpl(rowCounter);
-    } else {
-      sorter = new ParallelReadMergeSorterImpl(rowCounter);
-    }
-    if (configuration.getBucketingInfo() != null) {
-      sorter = new ParallelReadMergeSorterWithBucketingImpl(rowCounter,
-          configuration.getBucketingInfo());
-    }
+    sorter = SorterFactory.createSorter(configuration, rowCounter);
     sorter.initialize(sortParameters);
   }