You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/02/20 05:36:38 UTC

[carbondata] branch master updated: [CARBONDATA-3679] Optimize local sort performance

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ed889  [CARBONDATA-3679] Optimize local sort performance
a2ed889 is described below

commit a2ed8892da1b3f5e8d2c4261be097841b64df1be
Author: Manhua <ke...@qq.com>
AuthorDate: Sat Feb 15 10:46:59 2020 +0800

    [CARBONDATA-3679] Optimize local sort performance
    
    Why is this PR needed?
    In local sort, multi-threads is used for each partition but adding rows to a same object with lock. Only after that, sort and write operations run.
    For better performance, we want to do the sort and write(sortTemp file) operations in parallel.
    
    What changes were proposed in this PR?
    remove object lock when adding rows to (Unsafe)SortDataRows.
    keep object lock in (Unsafe)IntermediateMerger to collect results of all threads.
    
    For unsafe local sort, about 40% time is reduced with multi-cores in one case.
    
    Performance related configuration includes: carbon.number.of.cores.while.loading & yarn.nodemanager.local-dirs
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3603
---
 .../query/SecondaryIndexQueryResultProcessor.java  |   2 +
 .../sort/impl/ParallelReadMergeSorterImpl.java     |  43 ++--
 ...ParallelReadMergeSorterWithColumnRangeImpl.java |   4 +
 .../impl/UnsafeParallelReadMergeSorterImpl.java    |  51 ++--
 ...ParallelReadMergeSorterWithColumnRangeImpl.java |   6 +-
 .../loading/sort/unsafe/UnsafeSortDataRows.java    | 266 ++++++---------------
 .../unsafe/merger/UnsafeIntermediateMerger.java    |  55 ++---
 .../merger/CompactionResultSortProcessor.java      |   4 +
 .../processing/sort/sortdata/SortDataRows.java     | 234 +++++-------------
 .../sort/sortdata/SortIntermediateFileMerger.java  |  28 +--
 10 files changed, 209 insertions(+), 484 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java b/integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index c71ac6f..e6e08bf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -419,6 +419,8 @@ public class SecondaryIndexQueryResultProcessor {
     }
     dimensionColumnCount = dimensions.size();
     sortParameters = createSortParameters();
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
     intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
     this.sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
     this.sortDataRows.initialize();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 9b04a61..7fc715e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -82,28 +82,37 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     finalMerger =
         new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
             sortParameters);
+    // Delete if any older file exists in sort temp folder
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
   }
 
   @Override
   public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
       throws CarbonDataLoadingException {
-    SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    sortDataRow.initialize();
-    this.executorService = Executors.newFixedThreadPool(iterators.length,
+    this.executorService = Executors.newFixedThreadPool(sortParameters.getNumberOfCores(),
         new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName(),
                 true));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
       for (int i = 0; i < iterators.length; i++) {
+        SortDataRows sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
+        sortDataRows.setInstanceId(i);
         executorService.execute(
-            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+            new SortIteratorThread(iterators[i], sortDataRows, batchSize, rowCounter,
                 threadStatusObserver));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRow, sortParameters);
+      LOGGER.info("Record Processed For table: " + sortParameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(sortParameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(sortParameters.getPartitionID(),
+                      System.currentTimeMillis());
     } catch (Exception e) {
       checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
@@ -152,28 +161,6 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
   }
 
   /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    try {
-      // start sorting
-      sortDataRows.startSorting();
-
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
-              System.currentTimeMillis());
-      return false;
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  /**
    * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
    */
   private static class SortIteratorThread implements Runnable {
@@ -201,6 +188,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     @Override
     public void run() {
       try {
+        sortDataRows.initialize();
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
           int i = 0;
@@ -215,6 +203,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
             rowCounter.getAndAdd(i);
           }
         }
+        sortDataRows.startSorting();
       } catch (Exception e) {
         LOGGER.error(e.getMessage(), e);
         observer.notifyFailed(e);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
index cc106ad..a74b3b7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -90,6 +90,10 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor
     for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
       insideRowCounterList.add(new AtomicLong(0));
     }
+    // Delete if any older file exists in sort temp folder
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 9788777..8a237e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInterme
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.log4j.Logger;
 
@@ -74,35 +75,38 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
     finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
         sortParameters.getTempFileLocation());
+    // Delete if any older file exists in sort temp folder
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
   }
 
   @Override
   public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
       throws CarbonDataLoadingException {
     int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    UnsafeSortDataRows sortDataRow =
-        new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      sortDataRow.initialize();
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    this.executorService = Executors.newFixedThreadPool(iterators.length,
+    this.executorService = Executors.newFixedThreadPool(sortParameters.getNumberOfCores(),
         new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName(),
                 true));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(
-            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
-                this.threadStatusObserver));
+        UnsafeSortDataRows sortDataRows = new UnsafeSortDataRows(
+                sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
+        sortDataRows.setInstanceId(i);
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows,
+                batchSize, rowCounter, this.threadStatusObserver));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
       if (!sortParameters.getObserver().isFailed()) {
-        processRowToNextStep(sortDataRow, sortParameters);
+        LOGGER.info("Record Processed For table: " + sortParameters.getTableName());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(
+            sortParameters.getPartitionID(), System.currentTimeMillis());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(
+            sortParameters.getPartitionID(), System.currentTimeMillis());
       }
     } catch (Exception e) {
       checkError();
@@ -152,27 +156,6 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
   }
 
   /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    try {
-      // start sorting
-      sortDataRows.startSorting();
-
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  /**
    * This thread iterates the iterator and adds the rows
    */
   private static class SortIteratorThread implements Runnable {
@@ -200,6 +183,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
     @Override
     public void run() {
       try {
+        sortDataRows.initialize();
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
           int i = 0;
@@ -214,6 +198,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
             rowCounter.getAndAdd(i);
           }
         }
+        sortDataRows.startSorting();
       } catch (Exception e) {
         LOGGER.error(e.getMessage(), e);
         this.threadStatusObserver.notifyFailed(e);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
index 83bc5d8..0f30612 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -87,6 +87,10 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
     for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
       insideRowCounterList.add(new AtomicLong(0));
     }
+    // Delete if any older file exists in sort temp folder
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
   }
 
   @Override
@@ -284,4 +288,4 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
       return rowBatch;
     }
   }
-}
\ No newline at end of file
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5dfad9c..493faa3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -22,10 +22,6 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -36,8 +32,6 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -50,28 +44,13 @@ import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDa
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.log4j.Logger;
 
 public class UnsafeSortDataRows {
-  /**
-   * LOGGER
-   */
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
-  /**
-   * threadStatusObserver
-   */
   private ThreadStatusObserver threadStatusObserver;
-  /**
-   * executor service for data sort holder
-   */
-  private ExecutorService dataSorterAndWriterExecutorService;
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-
   private SortParameters parameters;
   private TableFieldStat tableFieldStat;
   private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream;
@@ -79,23 +58,12 @@ public class UnsafeSortDataRows {
 
   private UnsafeCarbonRowPage rowPage;
 
-  private final Object addRowsLock = new Object();
-
   private long inMemoryChunkSize;
 
-  private boolean enableInMemoryIntermediateMerge;
-
-  private int bytesAdded;
-
-  private long maxSizeAllowed;
-
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-  private Semaphore semaphore;
-
   private final String taskId;
 
+  private int instanceId;
+
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
     this.parameters = parameters;
@@ -113,33 +81,18 @@ public class UnsafeSortDataRows {
     this.threadStatusObserver = new ThreadStatusObserver();
     this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
     this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
-    enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
-            CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
+  }
 
-    // Take half the size of usable memory configured in sort memory size.
-    this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
+  public void setInstanceId(int instanceId) {
+    this.instanceId = instanceId;
   }
 
-  /**
-   * This method will be used to initialize
-   */
   public void initialize() {
     MemoryBlock baseBlock =
         UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
     boolean isMemoryAvailable =
         UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
     this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, taskId, isMemoryAvailable);
-    // Delete if any older file exists in sort temp folder
-    deleteSortLocationIfExists();
-
-    // create new sort temp directory
-    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService = Executors
-        .newFixedThreadPool(parameters.getNumberOfCores(),
-            new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(),
-                    true));
-    semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
   private UnsafeCarbonRowPage createUnsafeRowPage() {
@@ -154,39 +107,7 @@ public class UnsafeSortDataRows {
     return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, taskId, true);
   }
 
-  public boolean canAdd() {
-    return bytesAdded < maxSizeAllowed;
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
   public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    synchronized (addRowsLock) {
-      addBatch(rowBatch, size);
-    }
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @param size
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
-      throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    addBatch(rowBatch, size);
-  }
-
-  private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
     if (rowPage == null) {
       return;
     }
@@ -204,7 +125,7 @@ public class UnsafeSortDataRows {
             throw new CarbonSortKeyAndGroupByException(ex);
           }
         }
-        bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
+        rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
       } catch (Exception e) {
         if (e.getMessage().contains("cannot handle this row. create new page")) {
           rowPage.makeCanAddFail();
@@ -219,9 +140,6 @@ public class UnsafeSortDataRows {
     }
   }
 
-  /**
-   * This method will be used to add new row
-   */
   public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
     if (rowPage == null) {
       return;
@@ -257,10 +175,8 @@ public class UnsafeSortDataRows {
    * Below method will be used to start sorting process. This method will get
    * all the temp unsafe pages in memory and all the temp files and try to merge them if possible.
    * Also, it will spill the pages to disk or add it to unsafe sort memory.
-   *
-   * @throws InterruptedException if error occurs during data sort and write
    */
-  public void startSorting() throws InterruptedException {
+  public void startSorting() {
     LOGGER.info("Unsafe based sorting will be used");
     if (this.rowPage.getUsedSize() > 0) {
       TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
@@ -276,22 +192,6 @@ public class UnsafeSortDataRows {
     } else {
       rowPage.freeMemory();
     }
-    startFileBasedMerge();
-  }
-
-  /**
-   * Deal with the previous pages added to sort-memory. Carbondata will merge the in-memory pages
-   * or merge the sort temp files if possible. After that, carbondata will add current page to
-   * sort memory or just spill them.
-   */
-  private void handlePreviousPage()
-      throws InterruptedException {
-    if (enableInMemoryIntermediateMerge) {
-      unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-    }
-    unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-    semaphore.acquire();
-    dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
   }
 
   /**
@@ -323,23 +223,6 @@ public class UnsafeSortDataRows {
   }
 
   /**
-   * This method will be used to delete sort temp location is it is exites
-   */
-  public void deleteSortLocationIfExists() {
-    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
-  }
-
-  /**
-   * Below method will be used to start file based merge
-   *
-   * @throws InterruptedException
-   */
-  private void startFileBasedMerge() throws InterruptedException {
-    dataSorterAndWriterExecutorService.shutdown();
-    dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
-  }
-
-  /**
    * Observer class for thread execution
    * In case of any failure we need stop all the running thread
    */
@@ -351,8 +234,6 @@ public class UnsafeSortDataRows {
      * @throws CarbonSortKeyAndGroupByException
      */
     public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
-      semaphore.release();
-      dataSorterAndWriterExecutorService.shutdownNow();
       unsafeInMemoryIntermediateFileMerger.close();
       parameters.getObserver().setFailed(true);
       LOGGER.error(exception);
@@ -361,80 +242,69 @@ public class UnsafeSortDataRows {
   }
 
   /**
-   * This class is responsible for sorting and writing the object
-   * array which holds the records equal to given array size
+   * Deal with the previous pages added to sort-memory. Carbondata will merge the in-memory pages
+   * or merge the sort temp files if possible. After that, carbondata will add current page to
+   * sort memory or just spill them.
    */
-  private class DataSorterAndWriter implements Runnable {
-    private UnsafeCarbonRowPage page;
-
-    public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
-      this.page = rowPage;
-    }
-
-    @Override
-    public void run() {
+  private void handlePreviousPage() {
+    try {
+      long startTime = System.currentTimeMillis();
+      TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+              new UnsafeIntSortDataFormat(rowPage));
+      // if sort_columns is not none, sort by sort_columns
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+                new UnsafeRowComparator(rowPage));
+      } else {
+        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+                new UnsafeRowComparatorForNormalDims(rowPage));
+      }
+      // get sort storage memory block if memory is available in sort storage manager
+      // if space is available then store it in memory, if memory is not available
+      // then spill to disk
+      MemoryBlock sortStorageMemoryBlock = null;
+      if (!rowPage.isSaveToDisk()) {
+        sortStorageMemoryBlock =
+            UnsafeSortMemoryManager.INSTANCE.allocateMemory(taskId, rowPage.getDataBlock().size());
+      }
+      if (null == sortStorageMemoryBlock || rowPage.isSaveToDisk()) {
+        // create a new file every time
+        // create a new file and pick a temp directory randomly every time
+        String tmpDir = parameters.getTempFileLocation()[
+                new Random().nextInt(parameters.getTempFileLocation().length)];
+        File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName()
+                + '_' + parameters.getRangeId() + '_' + instanceId + '_' + System.nanoTime()
+                + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+        writeDataToFile(rowPage, sortTempFile);
+        LOGGER.info("Time taken to sort row page with size" + rowPage.getBuffer().getActualSize()
+                + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
+                + sortTempFile + ", sort temp file size in MB is "
+                + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+        rowPage.freeMemory();
+        // add sort temp filename to and arrayList. When the list size reaches 20 then
+        // intermediate merging of sort temp files will be triggered
+        unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
+      } else {
+        // copying data from working memory manager block to storage memory manager block
+        CarbonUnsafe.getUnsafe().copyMemory(
+                rowPage.getDataBlock().getBaseObject(), rowPage.getDataBlock().getBaseOffset(),
+                sortStorageMemoryBlock.getBaseObject(), sortStorageMemoryBlock.getBaseOffset(),
+                rowPage.getDataBlock().size());
+        // free unsafememory manager
+        rowPage.freeMemory();
+        rowPage.setNewDataBlock(sortStorageMemoryBlock);
+        // add sort temp filename to and arrayList. When the list size reaches 20 then
+        // intermediate merging of sort temp files will be triggered
+        rowPage.getBuffer().loadToUnsafe();
+        unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+        LOGGER.info("Time taken to sort row page with size: " + rowPage.getBuffer().getActualSize()
+                + " is: " + (System.currentTimeMillis() - startTime));
+      }
+    } catch (Throwable e) {
       try {
-        long startTime = System.currentTimeMillis();
-        TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
-            new UnsafeIntSortDataFormat(page));
-        // if sort_columns is not none, sort by sort_columns
-        if (parameters.getNumberOfNoDictSortColumns() > 0) {
-          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparator(page));
-        } else {
-          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDims(page));
-        }
-        // get sort storage memory block if memory is available in sort storage manager
-        // if space is available then store it in memory, if memory is not available
-        // then spill to disk
-        MemoryBlock sortStorageMemoryBlock = null;
-        if (!page.isSaveToDisk()) {
-          sortStorageMemoryBlock =
-              UnsafeSortMemoryManager.INSTANCE.allocateMemory(taskId, page.getDataBlock().size());
-        }
-        if (null == sortStorageMemoryBlock || page.isSaveToDisk()) {
-          // create a new file every time
-          // create a new file and pick a temp directory randomly every time
-          String tmpDir = parameters.getTempFileLocation()[
-              new Random().nextInt(parameters.getTempFileLocation().length)];
-          File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName()
-              + '_' + parameters.getRangeId() + '_' + System.nanoTime()
-              + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-          writeDataToFile(page, sortTempFile);
-          LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
-              + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
-              + sortTempFile + ", sort temp file size in MB is "
-              + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
-          page.freeMemory();
-          // add sort temp filename to and arrayList. When the list size reaches 20 then
-          // intermediate merging of sort temp files will be triggered
-          unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
-        } else {
-          // copying data from working memory manager block to storage memory manager block
-          CarbonUnsafe.getUnsafe()
-              .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
-                  sortStorageMemoryBlock.getBaseObject(),
-                  sortStorageMemoryBlock.getBaseOffset(), page.getDataBlock().size());
-          // free unsafememory manager
-          page.freeMemory();
-          page.setNewDataBlock(sortStorageMemoryBlock);
-          // add sort temp filename to and arrayList. When the list size reaches 20 then
-          // intermediate merging of sort temp files will be triggered
-          page.getBuffer().loadToUnsafe();
-          unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
-          LOGGER.info(
-              "Time taken to sort row page with size: " + page.getBuffer().getActualSize() + " is: "
-                  + (System.currentTimeMillis() - startTime));
-        }
-      } catch (Throwable e) {
-        try {
-          threadStatusObserver.notifyFailed(e);
-        } catch (CarbonSortKeyAndGroupByException ex) {
-          LOGGER.error(e.getMessage(), e);
-        }
-      } finally {
-        semaphore.release();
+        threadStatusObserver.notifyFailed(e);
+      } catch (CarbonSortKeyAndGroupByException ex) {
+        LOGGER.error(e.getMessage(), e);
       }
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 832bf80..6505ce0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -47,13 +47,8 @@ public class UnsafeIntermediateMerger {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(UnsafeIntermediateMerger.class.getName());
 
-  /**
-   * executorService
-   */
   private ExecutorService executorService;
-  /**
-   * rowPages
-   */
+
   private List<UnsafeCarbonRowPage> rowPages;
 
   private List<UnsafeInMemoryIntermediateDataMerger> mergedPages;
@@ -65,9 +60,7 @@ public class UnsafeIntermediateMerger {
   private List<File> procFiles;
 
   private List<Future<Void>> mergerTask;
-  /**
-   * size to be spilled in sort memory
-   */
+
   private long spillSizeInSortMemory;
 
   public UnsafeIntermediateMerger(SortParameters parameters) {
@@ -91,7 +84,6 @@ public class UnsafeIntermediateMerger {
           " less than the page size " + inMemoryChunkSizeInMB * 1024 * 1024 +
           ",so no merge and spill in-memory pages to disk");
     }
-
   }
 
   public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
@@ -99,6 +91,9 @@ public class UnsafeIntermediateMerger {
     // intermediate merging of sort temp files will be triggered
     synchronized (lockObject) {
       rowPages.add(rowPage);
+      if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+        tryTriggerInMemoryMerging(false);
+      }
     }
   }
 
@@ -107,28 +102,14 @@ public class UnsafeIntermediateMerger {
     // intermediate merging of sort temp files will be triggered
     synchronized (lockObject) {
       procFiles.add(sortTempFile);
-    }
-  }
-
-  public void startFileMergingIfPossible() {
-    File[] fileList;
-    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      synchronized (lockObject) {
-        fileList = procFiles.toArray(new File[procFiles.size()]);
-        this.procFiles = new ArrayList<File>();
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
+      if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+        File[] fileList = procFiles.toArray(new File[procFiles.size()]);
+        this.procFiles = new ArrayList<>();
+        startIntermediateMerging(fileList);
       }
-      startIntermediateMerging(fileList);
     }
   }
 
-  /**
-   * Below method will be used to start the intermediate file merging
-   *
-   * @param intermediateFiles
-   */
   private void startIntermediateMerging(File[] intermediateFiles) {
     //pick a temp location randomly
     String[] tempFileLocations = parameters.getTempFileLocation();
@@ -139,6 +120,10 @@ public class UnsafeIntermediateMerger {
         + CarbonCommonConstants.MERGERD_EXTENSION);
     UnsafeIntermediateFileMerger merger =
         new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Submitting request for intermediate merging number of files: "
+              + intermediateFiles.length);
+    }
     mergerTask.add(executorService.submit(merger));
   }
 
@@ -160,21 +145,11 @@ public class UnsafeIntermediateMerger {
       }
     }
     if (pages2Merge.size() > 1) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging of in-memory pages : "
-            + pages2Merge.size());
-      }
       startIntermediateMerging(pages2Merge.toArray(new UnsafeCarbonRowPage[pages2Merge.size()]),
           totalRows2Merge, spillDisk);
     }
   }
 
-  public void startInmemoryMergingIfPossible() {
-    if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      tryTriggerInMemoryMerging(false);
-    }
-  }
-
   /**
    * Below method will be used to start the intermediate inmemory merging
    *
@@ -187,6 +162,10 @@ public class UnsafeIntermediateMerger {
     UnsafeInMemoryIntermediateDataMerger merger =
         new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, parameters, spillDisk);
     mergedPages.add(merger);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Submitting request for intermediate merging of in-memory pages : "
+              + rowPages.length);
+    }
     mergerTask.add(executorService.submit(merger));
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 3d7d465..08273b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -455,6 +455,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     dimensionColumnCount = dimensions.size();
     sortParameters = createSortParameters();
     intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+    // Delete if any older file exists in sort temp folder
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
     // TODO: Now it is only supported onheap merge, but we can have unsafe merge
     // as well by using UnsafeSortDataRows.
     this.sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index d03ca74..4ff3a0f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -23,59 +23,37 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.log4j.Logger;
 
 public class SortDataRows {
-  /**
-   * LOGGER
-   */
+
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(SortDataRows.class.getName());
-  /**
-   * entryCount
-   */
+
   private int entryCount;
-  /**
-   * record holder array
-   */
+
   private Object[][] recordHolderList;
-  /**
-   * threadStatusObserver
-   */
+
   private ThreadStatusObserver threadStatusObserver;
-  /**
-   * executor service for data sort holder
-   */
-  private ExecutorService dataSorterAndWriterExecutorService;
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-  private Semaphore semaphore;
 
   private SortParameters parameters;
   private SortStepRowHandler sortStepRowHandler;
   private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream;
   private int sortBufferSize;
 
-  private SortIntermediateFileMerger intermediateFileMerger;
+  private int instanceId;
 
-  private final Object addRowsLock = new Object();
+  private SortIntermediateFileMerger intermediateFileMerger;
 
   public SortDataRows(SortParameters parameters,
       SortIntermediateFileMerger intermediateFileMerger) {
@@ -97,33 +75,17 @@ public class SortDataRows {
     };
   }
 
-  /**
-   * This method will be used to initialize
-   */
   public void initialize() {
-
     // create holder list which will hold incoming rows
     // size of list will be sort buffer size + 1 to avoid creation of new
     // array in list array
     this.recordHolderList = new Object[sortBufferSize][];
-    // Delete if any older file exists in sort temp folder
-    deleteSortLocationIfExists();
+  }
 
-    // create new sort temp directory
-    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService = Executors
-        .newFixedThreadPool(parameters.getNumberOfCores(),
-            new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName(),
-                    true));
-    semaphore = new Semaphore(parameters.getNumberOfCores());
+  public void setInstanceId(int instanceId) {
+    this.instanceId = instanceId;
   }
 
-  /**
-   * This method will be used to add new row
-   *
-   * @param row new row
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
   public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
@@ -133,15 +95,8 @@ public class SortDataRows {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("************ Writing to temp file ********** ");
       }
-      intermediateFileMerger.startMergingIfPossible();
       Object[][] recordHolderListLocal = recordHolderList;
-      try {
-        semaphore.acquire();
-        dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
-      } catch (InterruptedException e) {
-        LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e);
-        throw new CarbonSortKeyAndGroupByException(e);
-      }
+      handlePreviousPage(recordHolderListLocal);
       // create the new holder Array
       this.recordHolderList = new Object[this.sortBufferSize][];
       this.entryCount = 0;
@@ -149,46 +104,65 @@ public class SortDataRows {
     recordHolderList[entryCount++] = row;
   }
 
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
   public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
-    synchronized (addRowsLock) {
-      int sizeLeft = 0;
-      if (entryCount + size >= sortBufferSize) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("************ Writing to temp file ********** ");
-        }
-        intermediateFileMerger.startMergingIfPossible();
-        Object[][] recordHolderListLocal = recordHolderList;
-        sizeLeft = sortBufferSize - entryCount;
-        if (sizeLeft > 0) {
-          System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
-        }
-        try {
-          semaphore.acquire();
-          dataSorterAndWriterExecutorService
-              .execute(new DataSorterAndWriter(recordHolderListLocal));
-        } catch (Exception e) {
-          LOGGER.error(
-              "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e);
-          throw new CarbonSortKeyAndGroupByException(e);
-        }
-        // create the new holder Array
-        this.recordHolderList = new Object[this.sortBufferSize][];
-        this.entryCount = 0;
-        size = size - sizeLeft;
-        if (size == 0) {
-          return;
-        }
+    int sizeLeft = 0;
+    if (entryCount + size >= sortBufferSize) {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("************ Writing to temp file ********** ");
+      }
+      Object[][] recordHolderListLocal = recordHolderList;
+      sizeLeft = sortBufferSize - entryCount;
+      if (sizeLeft > 0) {
+        System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
+      }
+      handlePreviousPage(recordHolderListLocal);
+      // create the new holder Array
+      this.recordHolderList = new Object[this.sortBufferSize][];
+      this.entryCount = 0;
+      size = size - sizeLeft;
+      if (size == 0) {
+        return;
+      }
+    }
+    System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
+    entryCount += size;
+  }
+
+  /**
+   * sort and write data
+   * @param recordHolderArray
+   */
+  private void handlePreviousPage(Object[][] recordHolderArray)
+          throws CarbonSortKeyAndGroupByException {
+    try {
+      long startTime = System.currentTimeMillis();
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        Arrays.sort(recordHolderArray,
+                new NewRowComparator(parameters.getNoDictionarySortColumn(),
+                        parameters.getNoDictDataType()));
+      } else {
+        Arrays.sort(recordHolderArray,
+                new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
       }
-      System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
-      entryCount += size;
+
+      // create a new file and choose folder randomly every time
+      String[] tmpFileLocation = parameters.getTempFileLocation();
+      String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
+      File sortTempFile = new File(
+              locationChosen + File.separator + parameters.getTableName()
+                      + '_' + parameters.getRangeId() + '_' + instanceId + '_' + System.nanoTime()
+                      + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+      writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile);
+      // add sort temp filename to arrayList. When the list size reaches 20 then
+      // intermediate merging of sort temp files will be triggered
+      intermediateFileMerger.addFileToMerge(sortTempFile);
+      LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
+              System.currentTimeMillis() - startTime) + ", sort temp file size in MB is "
+              + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+    } catch (Throwable e) {
+      threadStatusObserver.notifyFailed(e);
     }
   }
 
@@ -218,12 +192,11 @@ public class SortDataRows {
       String[] tmpLocation = parameters.getTempFileLocation();
       String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
       File file = new File(locationChosen + File.separator + parameters.getTableName()
-          + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+          + '_' + parameters.getRangeId() + '_' + instanceId + '_' + System.nanoTime()
           + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
       writeDataToFile(recordHolderList, this.entryCount, file);
     }
 
-    startFileBasedMerge();
     this.recordHolderList = null;
   }
 
@@ -254,28 +227,6 @@ public class SortDataRows {
   }
 
   /**
-   * This method will be used to delete sort temp location is it is exites
-   *
-   */
-  private void deleteSortLocationIfExists() {
-    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
-  }
-
-  /**
-   * Below method will be used to start file based merge
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
-    try {
-      dataSorterAndWriterExecutorService.shutdown();
-      dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
-    }
-  }
-
-  /**
    * Observer class for thread execution
    * In case of any failure we need stop all the running thread
    */
@@ -295,61 +246,8 @@ public class SortDataRows {
   }
 
   public void close() {
-    if (null != dataSorterAndWriterExecutorService && !dataSorterAndWriterExecutorService
-        .isShutdown()) {
-      dataSorterAndWriterExecutorService.shutdownNow();
-    }
     intermediateFileMerger.close();
   }
 
-  /**
-   * This class is responsible for sorting and writing the object
-   * array which holds the records equal to given array size
-   */
-  private class DataSorterAndWriter implements Runnable {
-    private Object[][] recordHolderArray;
-
-    public DataSorterAndWriter(Object[][] recordHolderArray) {
-      this.recordHolderArray = recordHolderArray;
-    }
-
-    @Override
-    public void run() {
-      try {
-        long startTime = System.currentTimeMillis();
-        if (parameters.getNumberOfNoDictSortColumns() > 0) {
-          Arrays.sort(recordHolderArray,
-              new NewRowComparator(parameters.getNoDictionarySortColumn(),
-                  parameters.getNoDictDataType()));
-        } else {
-          Arrays.sort(recordHolderArray,
-              new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
-        }
-
-        // create a new file and choose folder randomly every time
-        String[] tmpFileLocation = parameters.getTempFileLocation();
-        String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
-        File sortTempFile = new File(
-            locationChosen + File.separator + parameters.getTableName()
-                + '_' + parameters.getRangeId() + '_' + System.nanoTime()
-                + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-        writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile);
-        // add sort temp filename to and arrayList. When the list size reaches 20 then
-        // intermediate merging of sort temp files will be triggered
-        intermediateFileMerger.addFileToMerge(sortTempFile);
-        LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
-            System.currentTimeMillis() - startTime) + ", sort temp file size in MB is "
-            + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
-      } catch (Throwable e) {
-        try {
-          threadStatusObserver.notifyFailed(e);
-        } catch (CarbonSortKeyAndGroupByException ex) {
-          LOGGER.error(ex);
-        }
-      } finally {
-        semaphore.release();
-      }
-    }
-  }
 }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index cbd4948..1c9f9c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -42,13 +42,8 @@ public class SortIntermediateFileMerger {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
 
-  /**
-   * executorService
-   */
   private ExecutorService executorService;
-  /**
-   * procFiles
-   */
+
   private List<File> procFiles;
 
   private SortParameters parameters;
@@ -72,20 +67,11 @@ public class SortIntermediateFileMerger {
     // intermediate merging of sort temp files will be triggered
     synchronized (lockObject) {
       procFiles.add(sortTempFile);
-    }
-  }
-
-  public void startMergingIfPossible() {
-    File[] fileList;
-    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      synchronized (lockObject) {
-        fileList = procFiles.toArray(new File[procFiles.size()]);
-        this.procFiles = new ArrayList<File>();
+      if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+        File[] fileList = procFiles.toArray(new File[procFiles.size()]);
+        this.procFiles = new ArrayList<>();
+        startIntermediateMerging(fileList);
       }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
-      }
-      startIntermediateMerging(fileList);
     }
   }
 
@@ -101,6 +87,10 @@ public class SortIntermediateFileMerger {
         + '_' + parameters.getRangeId() + '_' + System.nanoTime()
         + CarbonCommonConstants.MERGERD_EXTENSION);
     IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Submitting request for intermediate merging number of files: "
+              + intermediateFiles.length);
+    }
     mergerTask.add(executorService.submit(merger));
   }