You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:02:07 UTC

[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index f605b22..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(
-                UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private BucketingInfo bucketingInfo;
-
-  public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
-      BucketingInfo bucketingInfo) {
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
-    UnsafeIntermediateMerger[] intermediateFileMergers =
-        new UnsafeIntermediateMerger[sortDataRows.length];
-    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
-    if (inMemoryChunkSizeInMB < 5) {
-      inMemoryChunkSizeInMB = 5;
-    }
-    try {
-      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-        SortParameters parameters = sortParameters.getCopy();
-        parameters.setPartitionID(i + "");
-        setTempLocation(parameters);
-        intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
-        sortDataRows[i] =
-            new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
-        sortDataRows[i].initialize();
-      }
-    } catch (MemoryException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
-            .threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRows, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      for (int i = 0; i < intermediateFileMergers.length; i++) {
-        intermediateFileMergers[i].finish();
-      }
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
-    for (int i = 0; i < sortDataRows.length; i++) {
-      batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]);
-    }
-
-    return batchIterator;
-  }
-
-  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() {
-    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        sortParameters.getDatabaseName(), sortParameters.getTableName(),
-        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
-        false, false);
-    // Set the data file location
-    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
-        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
-  }
-
-  @Override public void close() {
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      for (int i = 0; i < sortDataRows.length; i++) {
-        // start sorting
-        sortDataRows[i].startSorting();
-      }
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getSegmentId(),
-            false, false);
-    String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(tmpLoc);
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private UnsafeSortDataRows[] sortDataRows;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
-        UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.threadStatusObserver = threadStatusObserver;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
-              synchronized (sortDataRow) {
-                sortDataRow.addRow(row.getData());
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-
-  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-
-    private int batchSize;
-
-    private boolean firstRow;
-
-    private UnsafeIntermediateMerger intermediateMerger;
-
-    public MergedDataIterator(int batchSize,
-        UnsafeIntermediateMerger intermediateMerger) {
-      this.batchSize = batchSize;
-      this.intermediateMerger = intermediateMerger;
-      this.firstRow = true;
-    }
-
-    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
-    @Override public boolean hasNext() {
-      if (firstRow) {
-        firstRow = false;
-        finalMerger = getFinalMerger();
-        List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
-        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
-            intermediateMerger.getMergedPages());
-      }
-      return finalMerger.hasNext();
-    }
-
-    @Override public CarbonRowBatch next() {
-      int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-      while (finalMerger.hasNext() && counter < batchSize) {
-        rowBatch.addRow(new CarbonRow(finalMerger.next()));
-        counter++;
-      }
-      return rowBatch;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
new file mode 100644
index 0000000..99d6627
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for the data loading with specifying column value range, such as
+ * bucketing, sort_column_bounds, it sorts each range of data separately and write to temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(
+          UnsafeParallelReadMergeSorterWithColumnRangeImpl.class.getName());
+
+  private SortParameters originSortParameters;
+  private UnsafeIntermediateMerger[] intermediateFileMergers;
+  private int inMemoryChunkSizeInMB;
+  private AtomicLong rowCounter;
+  private ColumnRangeInfo columnRangeInfo;
+  /**
+   * counters to collect information about rows processed by each range
+   */
+  private List<AtomicLong> insideRowCounterList;
+
+  public UnsafeParallelReadMergeSorterWithColumnRangeImpl(AtomicLong rowCounter,
+      ColumnRangeInfo columnRangeInfo) {
+    this.rowCounter = rowCounter;
+    this.columnRangeInfo = columnRangeInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.originSortParameters = sortParameters;
+    int totalInMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    inMemoryChunkSizeInMB = totalInMemoryChunkSizeInMB / columnRangeInfo.getNumOfRanges();
+    if (inMemoryChunkSizeInMB < 5) {
+      inMemoryChunkSizeInMB = 5;
+    }
+    this.insideRowCounterList = new ArrayList<>(columnRangeInfo.getNumOfRanges());
+    for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+      insideRowCounterList.add(new AtomicLong(0));
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[columnRangeInfo.getNumOfRanges()];
+    intermediateFileMergers = new UnsafeIntermediateMerger[columnRangeInfo.getNumOfRanges()];
+    SortParameters[] sortParameterArray = new SortParameters[columnRangeInfo.getNumOfRanges()];
+    try {
+      for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+        SortParameters parameters = originSortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        parameters.setRangeId(i);
+        sortParameterArray[i] = parameters;
+        setTempLocation(parameters);
+        intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+        sortDataRows[i] =
+            new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+        sortDataRows[i].initialize();
+      }
+    } catch (MemoryException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+            this.insideRowCounterList, this.threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, originSortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[columnRangeInfo.getNumOfRanges()];
+    for (int i = 0; i < sortDataRows.length; i++) {
+      batchIterator[i] =
+          new MergedDataIterator(sortParameterArray[i], batchSize, intermediateFileMergers[i]);
+    }
+
+    return batchIterator;
+  }
+
+  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()),
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
+        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+  }
+
+  @Override public void close() {
+    for (int i = 0; i < intermediateFileMergers.length; i++) {
+      intermediateFileMergers[i].close();
+    }
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+            parameters.getTaskNo(), parameters.getSegmentId(),
+            false, false);
+    String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    LOGGER.error("set temp location: " + StringUtils.join(tmpLoc, ", "));
+    parameters.setTempFileLocation(tmpLoc);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private UnsafeSortDataRows[] sortDataRows;
+    private AtomicLong rowCounter;
+    private List<AtomicLong> insideRowCounterList;
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        UnsafeSortDataRows[] sortDataRows, AtomicLong rowCounter,
+        List<AtomicLong> insideRowCounterList,
+        ThreadStatusObserver threadStatusObserver) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.rowCounter = rowCounter;
+      this.insideRowCounterList = insideRowCounterList;
+      this.threadStatusObserver = threadStatusObserver;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              UnsafeSortDataRows sortDataRow = sortDataRows[row.getRangeId()];
+              synchronized (sortDataRow) {
+                rowCounter.getAndIncrement();
+                insideRowCounterList.get(row.getRangeId()).getAndIncrement();
+                sortDataRow.addRow(row.getData());
+              }
+            }
+          }
+        }
+        LOGGER.info("Rows processed by each range: " + insideRowCounterList);
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private SortParameters sortParameters;
+
+    private int batchSize;
+
+    private boolean firstRow;
+
+    private UnsafeIntermediateMerger intermediateMerger;
+
+    public MergedDataIterator(SortParameters sortParameters, int batchSize,
+        UnsafeIntermediateMerger intermediateMerger) {
+      this.sortParameters = sortParameters;
+      this.batchSize = batchSize;
+      this.intermediateMerger = intermediateMerger;
+      this.firstRow = true;
+    }
+
+    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(sortParameters);
+        List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+            intermediateMerger.getMergedPages());
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..eaa858e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -366,9 +366,9 @@ public class UnsafeSortDataRows {
           // create a new file and pick a temp directory randomly every time
           String tmpDir = parameters.getTempFileLocation()[
               new Random().nextInt(parameters.getTempFileLocation().length)];
-          File sortTempFile = new File(
-              tmpDir + File.separator + parameters.getTableName()
-                  + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+          File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName()
+              + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+              + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
           writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 0d24e01..104f3f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -112,9 +112,9 @@ public class UnsafeIntermediateMerger {
     String[] tempFileLocations = parameters.getTempFileLocation();
     String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
 
-    File file = new File(
-        targetLocation + File.separator + parameters.getTableName() + System
-            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+    File file = new File(targetLocation + File.separator + parameters.getTableName()
+        + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+        + CarbonCommonConstants.MERGERD_EXTENSION);
     UnsafeIntermediateFileMerger merger =
         new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
     mergerTask.add(executorService.submit(merger));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 64f3c25..b1dc156 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -147,19 +147,20 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   }
 
   private List<File> getFilesToMergeSort() {
+    // this can be partitionId, bucketId or rangeId, let's call it rangeId
+    final int rangeId = parameters.getRangeId();
+
     FileFilter fileFilter = new FileFilter() {
       public boolean accept(File pathname) {
-        return pathname.getName().startsWith(tableName);
+        return pathname.getName().startsWith(tableName + '_' + rangeId);
       }
     };
 
     // get all the merged files
     List<File> files = new ArrayList<File>(tempFileLocation.length);
-    for (String tempLoc : tempFileLocation)
-    {
+    for (String tempLoc : tempFileLocation) {
       File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0)
-      {
+      if (null != subFiles && subFiles.length > 0) {
         files.addAll(Arrays.asList(subFiles));
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 90a340d..72a8c25 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -19,21 +19,33 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
 import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
 import org.apache.carbondata.processing.loading.converter.RowConverter;
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl;
+import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
  * And nondictionary columns as well as complex columns will be converted to byte[].
@@ -41,7 +53,10 @@ import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private List<RowConverter> converters;
+  private Partitioner<CarbonRow> partitioner;
   private BadRecordsLogger badRecordLogger;
+  private boolean isSortColumnRangeEnabled = false;
+  private boolean isBucketColumnEnabled = false;
 
   public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
@@ -64,6 +79,81 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     configuration.setCardinalityFinder(converter);
     converters.add(converter);
     converter.initialize();
+
+    if (null != configuration.getBucketingInfo()) {
+      this.isBucketColumnEnabled = true;
+      initializeBucketColumnPartitioner();
+    } else if (null != configuration.getSortColumnRangeInfo()) {
+      this.isSortColumnRangeEnabled = true;
+      initializeSortColumnRangesPartitioner();
+    }
+  }
+
+  /**
+   * initialize partitioner for bucket column
+   */
+  private void initializeBucketColumnPartitioner() {
+    List<Integer> indexes = new ArrayList<>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    DataField[] inputDataFields = getOutput();
+    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+    for (int i = 0; i < inputDataFields.length; i++) {
+      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+        if (inputDataFields[i].getColumn().getColName()
+            .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+          indexes.add(i);
+          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+          break;
+        }
+      }
+    }
+
+    // hash partitioner to dispatch rows by bucket column
+    this.partitioner =
+        new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+  }
+
+
+  /**
+   * initialize partitioner for sort column ranges
+   */
+  private void initializeSortColumnRangesPartitioner() {
+    // convert user specified sort-column ranges
+    SortColumnRangeInfo sortColumnRangeInfo = configuration.getSortColumnRangeInfo();
+    int rangeValueCnt = sortColumnRangeInfo.getUserSpecifiedRanges().length;
+    CarbonRow[] convertedSortColumnRanges = new CarbonRow[rangeValueCnt];
+    for (int i = 0; i < rangeValueCnt; i++) {
+      Object[] fakeOriginRow = new Object[configuration.getDataFields().length];
+      String[] oneBound = StringUtils.splitPreserveAllTokens(
+          sortColumnRangeInfo.getUserSpecifiedRanges()[i], sortColumnRangeInfo.getSeparator(), -1);
+      // set the corresponding sort column
+      int j = 0;
+      for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) {
+        fakeOriginRow[colIdx] = oneBound[j++];
+      }
+      CarbonRow fakeCarbonRow = new CarbonRow(fakeOriginRow);
+      convertFakeRow(fakeCarbonRow, sortColumnRangeInfo);
+      convertedSortColumnRanges[i] = fakeCarbonRow;
+    }
+    // sort the range bounds (sort in carbon is a little different from what we think)
+    Arrays.sort(convertedSortColumnRanges,
+        new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
+            sortColumnRangeInfo.getIsSortColumnNoDict()));
+
+    // range partitioner to dispatch rows by sort columns
+    this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
+        new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
+            sortColumnRangeInfo.getIsSortColumnNoDict()));
+  }
+
+  // only convert sort column fields
+  private void convertFakeRow(CarbonRow fakeRow, SortColumnRangeInfo sortColumnRangeInfo) {
+    FieldConverter[] fieldConverters = converters.get(0).getFieldConverters();
+    BadRecordLogHolder logHolder = new BadRecordLogHolder();
+    logHolder.setLogged(false);
+    for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) {
+      fieldConverters[colIdx].convert(fakeRow, logHolder);
+    }
   }
 
   /**
@@ -102,6 +192,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
     while (rowBatch.hasNext()) {
       CarbonRow convertRow = localConverter.convert(rowBatch.next());
+      if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
+        short rangeNumber = (short) partitioner.getPartition(convertRow);
+        convertRow.setRangeId(rangeNumber);
+      }
       rowBatch.setPreviousRow(convertRow);
     }
     rowCounter.getAndAdd(rowBatch.getSize());
@@ -134,6 +228,12 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   }
 
   @Override protected String getStepName() {
-    return "Data Converter";
+    if (isBucketColumnEnabled) {
+      return "Data Converter with Bucketing";
+    } else if (isSortColumnRangeEnabled) {
+      return "Data Converter with sort column range";
+    } else {
+      return "Data Converter";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
deleted file mode 100644
index a1181c9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.BadRecordsLogger;
-import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.converter.RowConverter;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.partition.Partitioner;
-import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
-
-/**
- * Replace row data fields with dictionary values if column is configured dictionary encoded.
- * And nondictionary columns as well as complex columns will be converted to byte[].
- */
-public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
-
-  private List<RowConverter> converters;
-
-  private Partitioner<Object[]> partitioner;
-
-  private BadRecordsLogger badRecordLogger;
-
-  public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-  }
-
-  @Override
-  public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    super.initialize();
-    child.initialize();
-    converters = new ArrayList<>();
-    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
-    RowConverter converter =
-        new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
-    configuration.setCardinalityFinder(converter);
-    converters.add(converter);
-    converter.initialize();
-    List<Integer> indexes = new ArrayList<>();
-    List<ColumnSchema> columnSchemas = new ArrayList<>();
-    DataField[] inputDataFields = getOutput();
-    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
-    for (int i = 0; i < inputDataFields.length; i++) {
-      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
-        if (inputDataFields[i].getColumn().getColName()
-            .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
-          indexes.add(i);
-          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
-          break;
-        }
-      }
-    }
-    partitioner =
-        new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
-  }
-
-  /**
-   * Create the iterator using child iterator.
-   *
-   * @param childIter
-   * @return new iterator with step specific processing.
-   */
-  @Override
-  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
-    return new CarbonIterator<CarbonRowBatch>() {
-      RowConverter localConverter;
-      private boolean first = true;
-      @Override public boolean hasNext() {
-        if (first) {
-          first = false;
-          localConverter = converters.get(0).createCopyForNewThread();
-          converters.add(localConverter);
-        }
-        return childIter.hasNext();
-      }
-
-      @Override public CarbonRowBatch next() {
-        return processRowBatch(childIter.next(), localConverter);
-      }
-    };
-  }
-
-  /**
-   * Process the batch of rows as per the step logic.
-   *
-   * @param rowBatch
-   * @return processed row.
-   */
-  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
-    while (rowBatch.hasNext()) {
-      CarbonRow row = rowBatch.next();
-      short bucketNumber = (short) partitioner.getPartition(row.getData());
-      CarbonRow convertRow = localConverter.convert(row);
-      convertRow.bucketNumber = bucketNumber;
-      rowBatch.setPreviousRow(convertRow);
-    }
-    rowCounter.getAndAdd(rowBatch.getSize());
-    // reuse the origin batch
-    rowBatch.rewind();
-    return rowBatch;
-  }
-
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() {
-    if (!closed) {
-      super.close();
-      if (null != badRecordLogger) {
-        badRecordLogger.closeStreams();
-        CarbonBadRecordUtil.renameBadRecord(configuration);
-      }
-      if (converters != null) {
-        for (RowConverter converter : converters) {
-          converter.finish();
-        }
-      }
-    }
-  }
-  @Override protected String getStepName() {
-    return "Data Converter with Bucketing";
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 58009af..0467b11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -17,7 +17,15 @@
 package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -25,6 +33,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
@@ -92,7 +101,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
+      ExecutorService rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
+          new CarbonThreadFactory("WriterForwardPool: " + tableName));
+      List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length);
       int i = 0;
+      // do this concurrently
       for (Iterator<CarbonRowBatch> iterator : iterators) {
         String[] storeLocation = getStoreLocation(tableIdentifier);
 
@@ -112,9 +125,19 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         if (!rowsNotExist) {
           finish(dataHandler);
         }
+        rangeExecutorServiceSubmitList.add(
+            rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i)));
         i++;
       }
-
+      try {
+        rangeExecutorService.shutdown();
+        rangeExecutorService.awaitTermination(2, TimeUnit.DAYS);
+        for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) {
+          rangeExecutorServiceSubmitList.get(j).get();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new CarbonDataWriterException(e);
+      }
     } catch (CarbonDataWriterException e) {
       LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
       throw new CarbonDataLoadingException(
@@ -130,6 +153,51 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return "Data Writer";
   }
 
+  /**
+   * Used to forward rows to different ranges based on range id.
+   */
+  private final class WriterForwarder implements Callable<Void> {
+    private Iterator<CarbonRowBatch> insideRangeIterator;
+    private CarbonTableIdentifier tableIdentifier;
+    private int rangeId;
+
+    public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator,
+        CarbonTableIdentifier tableIdentifier, int rangeId) {
+      this.insideRangeIterator = insideRangeIterator;
+      this.tableIdentifier = tableIdentifier;
+      this.rangeId = rangeId;
+    }
+
+    @Override public Void call() throws Exception {
+      LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
+          + ", range: " + rangeId);
+      processRange(insideRangeIterator, tableIdentifier, rangeId);
+      return null;
+    }
+  }
+
+  private void processRange(Iterator<CarbonRowBatch> insideRangeIterator,
+      CarbonTableIdentifier tableIdentifier, int rangeId) {
+    String[] storeLocation = getStoreLocation(tableIdentifier);
+
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0);
+    CarbonFactHandler dataHandler = null;
+    boolean rowsNotExist = true;
+    while (insideRangeIterator.hasNext()) {
+      if (rowsNotExist) {
+        rowsNotExist = false;
+        dataHandler = CarbonFactHandlerFactory
+            .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+        dataHandler.initialise();
+      }
+      processBatch(insideRangeIterator.next(), dataHandler);
+    }
+    if (!rowsNotExist) {
+      finish(dataHandler);
+    }
+  }
+
   public void finish(CarbonFactHandler dataHandler) {
     CarbonTableIdentifier tableIdentifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index a4ac0ea..1a839a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -119,9 +119,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   }
 
   private List<File> getFilesToMergeSort() {
+    final int rangeId = sortParameters.getRangeId();
     FileFilter fileFilter = new FileFilter() {
       public boolean accept(File pathname) {
-        return pathname.getName().startsWith(tableName);
+        return pathname.getName().startsWith(tableName + '_' + rangeId);
       }
     };
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index c7efbd9..a5caf7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -212,9 +212,9 @@ public class SortDataRows {
       // create new file and choose folder randomly
       String[] tmpLocation = parameters.getTempFileLocation();
       String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
-      File file = new File(
-          locationChosen + File.separator + parameters.getTableName() +
-              System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+      File file = new File(locationChosen + File.separator + parameters.getTableName()
+          + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+          + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
       writeDataToFile(recordHolderList, this.entryCount, file);
     }
 
@@ -325,8 +325,9 @@ public class SortDataRows {
         String[] tmpFileLocation = parameters.getTempFileLocation();
         String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
         File sortTempFile = new File(
-            locationChosen + File.separator + parameters.getTableName() + System
-                .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+            locationChosen + File.separator + parameters.getTableName()
+                + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+                + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
         writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile);
         // add sort temp filename to and arrayList. When the list size reaches 20 then
         // intermediate merging of sort temp files will be triggered

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index 9c995a5..0e3f6bd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -94,9 +94,9 @@ public class SortIntermediateFileMerger {
   private void startIntermediateMerging(File[] intermediateFiles) {
     int index = new Random().nextInt(parameters.getTempFileLocation().length);
     String chosenTempDir = parameters.getTempFileLocation()[index];
-    File file = new File(
-        chosenTempDir + File.separator + parameters.getTableName() + System
-            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+    File file = new File(chosenTempDir + File.separator + parameters.getTableName()
+        + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+        + CarbonCommonConstants.MERGERD_EXTENSION);
     IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
     mergerTask.add(executorService.submit(merger));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 4d31f87..4d333ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -119,6 +119,7 @@ public class SortParameters implements Serializable {
   private int numberOfCores;
 
   private int batchSortSizeinMb;
+  private int rangeId = 0;
 
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
@@ -147,6 +148,7 @@ public class SortParameters implements Serializable {
     parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
     parameters.numberOfCores = numberOfCores;
     parameters.batchSortSizeinMb = batchSortSizeinMb;
+    parameters.rangeId = rangeId;
     return parameters;
   }
 
@@ -429,6 +431,14 @@ public class SortParameters implements Serializable {
     return parameters;
   }
 
+  public int getRangeId() {
+    return rangeId;
+  }
+
+  public void setRangeId(int rangeId) {
+    this.rangeId = rangeId;
+  }
+
   /**
    * this method will set the boolean mapping for no dictionary sort columns
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index b795696..c0acadd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -167,12 +167,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     blockletProcessingCount = new AtomicInteger(0);
     producerExecutorService = Executors.newFixedThreadPool(numberOfCores,
-        new CarbonThreadFactory("ProducerPool:" + model.getTableName()));
+        new CarbonThreadFactory("ProducerPool:" + model.getTableName()
+            + ", range: " + model.getBucketId()));
     producerExecutorServiceTaskList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     LOGGER.info("Initializing writer executors");
     consumerExecutorService = Executors
-        .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()));
+        .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()
+            + ", range: " + model.getBucketId()));
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
     tablePageList = new TablePageList();