You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:31 UTC

[15/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
new file mode 100644
index 0000000..f5f112c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.row;
+
+import java.util.NoSuchElementException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+
+
+/**
+ * Batch of rows.
+ */
+public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
+
+  private CarbonRow[] rowBatch;
+
+  private int size = 0;
+
+  private int index = 0;
+
+  public CarbonRowBatch(int batchSize) {
+    this.rowBatch = new CarbonRow[batchSize];
+  }
+
+  public void addRow(CarbonRow carbonRow) {
+    rowBatch[size++] = carbonRow;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  @Override public boolean hasNext() {
+    return index < size;
+  }
+
+  @Override
+  public CarbonRow next() throws NoSuchElementException {
+    if (hasNext()) {
+      return rowBatch[index++];
+    }
+    throw new NoSuchElementException("no more elements to iterate");
+  }
+
+  @Override public void remove() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
new file mode 100644
index 0000000..5d39145
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonSortBatch.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.row;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+
+/**
+ * Batch of sorted rows which are ready to be processed by
+ */
+public class CarbonSortBatch extends CarbonRowBatch {
+
+  private UnsafeSingleThreadFinalSortFilesMerger iterator;
+
+  public CarbonSortBatch(UnsafeSingleThreadFinalSortFilesMerger iterator) {
+    super(0);
+    this.iterator = iterator;
+  }
+
+  @Override public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override public CarbonRow next() {
+    return new CarbonRow(iterator.next());
+  }
+
+  @Override public void close() {
+    iterator.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
new file mode 100644
index 0000000..550fe70
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/AbstractMergeSorter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.sort.impl.ThreadStatusObserver;
+
+/**
+ * The class defines the common methods used in across various type of sort
+ */
+public abstract class AbstractMergeSorter implements Sorter {
+  /**
+   * instance of thread status observer
+   */
+  protected ThreadStatusObserver threadStatusObserver;
+
+  /**
+   * Below method will be used to check error in exception
+   */
+  public void checkError() {
+    if (threadStatusObserver.getThrowable() != null) {
+      if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
+        throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
+      } else {
+        throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
new file mode 100644
index 0000000..23179fa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Sort scope options
+ */
+public class SortScopeOptions {
+
+  public static SortScope getSortScope(String sortScope) {
+    if (sortScope == null) {
+      sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
+    }
+    switch (sortScope.toUpperCase()) {
+      case "BATCH_SORT":
+        return SortScope.BATCH_SORT;
+      case "LOCAL_SORT":
+        return SortScope.LOCAL_SORT;
+      case "GLOBAL_SORT":
+        return SortScope.GLOBAL_SORT;
+      case "NO_SORT":
+        return SortScope.NO_SORT;
+      default:
+        return SortScope.LOCAL_SORT;
+    }
+  }
+
+  public static boolean isValidSortOption(String sortScope) {
+    return CarbonUtil.isValidSortOption(sortScope);
+  }
+
+  public enum SortScope {
+    NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..9665487
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class SortStepRowUtil {
+  public static Object[] convertRow(Object[] data, SortParameters parameters) {
+    int measureCount = parameters.getMeasureColCount();
+    int dimensionCount = parameters.getDimColCount();
+    int complexDimensionCount = parameters.getComplexDimColCount();
+    int noDictionaryCount = parameters.getNoDictionaryCount();
+    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int allCount = 0;
+    int[] dim = new int[dimensionCount];
+    byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
+    Object[] measures = new Object[measureCount];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          nonDicArray[nonDicIndex++] = (byte[]) data[i];
+        } else {
+          dim[index++] = (int) data[allCount];
+        }
+        allCount++;
+      }
+
+      for (int i = 0; i < complexDimensionCount; i++) {
+        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
+        allCount++;
+      }
+
+      index = 0;
+
+      // read measure values
+      for (int i = 0; i < measureCount; i++) {
+        measures[index++] = data[allCount];
+        allCount++;
+      }
+
+      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row ", e);
+    }
+
+    //return out row
+    return holder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
new file mode 100644
index 0000000..9a47e50
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/Sorter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * This interface sorts all the data of iterators.
+ * The life cycle of this interface is initialize -> sort -> close
+ */
+public interface Sorter {
+
+  /**
+   * Initialize sorter with sort parameters.
+   *
+   * @param sortParameters
+   */
+  void initialize(SortParameters sortParameters);
+
+  /**
+   * Sorts the data of all iterators, this iterators can be
+   * read parallely depends on implementation.
+   *
+   * @param iterators array of iterators to read data.
+   * @return
+   * @throws CarbonDataLoadingException
+   */
+  Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException;
+
+  /**
+   * Close resources
+   */
+  void close();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
new file mode 100644
index 0000000..a8f0282
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SorterFactory {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SorterFactory.class.getName());
+
+  public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
+    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
+    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+    Sorter sorter;
+    if (offheapsort) {
+      if (configuration.getBucketingInfo() != null) {
+        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+            configuration.getBucketingInfo());
+      } else {
+        sorter = new UnsafeParallelReadMergeSorterImpl(counter);
+      }
+    } else {
+      if (configuration.getBucketingInfo() != null) {
+        sorter =
+            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
+      } else {
+        sorter = new ParallelReadMergeSorterImpl(counter);
+      }
+    }
+    if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+      if (configuration.getBucketingInfo() == null) {
+        sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
+      } else {
+        LOGGER.warn(
+            "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
+                .getName());
+      }
+    }
+    return sorter;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..6e43fcb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ */
+public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  private SingleThreadFinalSortFilesMerger finalMerger;
+
+  private AtomicLong rowCounter;
+
+  public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  @Override
+  public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+    String[] storeLocations =
+        CarbonDataProcessorUtil.getLocalDataFolderLocation(
+            sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
+        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    finalMerger =
+        new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
+            sortParameters.getDimColCount(),
+            sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
+            sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
+            sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getNoDictionarySortColumn());
+  }
+
+  @Override
+  public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      sortDataRow.initialize();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(
+            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+                threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRow, sortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      intermediateFileMerger.finish();
+      intermediateFileMerger = null;
+      finalMerger.startFinalMerge();
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    // Creates the iterator to read from merge sorter.
+    Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
+
+      @Override
+      public boolean hasNext() {
+        return finalMerger.hasNext();
+      }
+
+      @Override
+      public CarbonRowBatch next() {
+        int counter = 0;
+        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+        while (finalMerger.hasNext() && counter < batchSize) {
+          rowBatch.addRow(new CarbonRow(finalMerger.next()));
+          counter++;
+        }
+        return rowBatch;
+      }
+    };
+    return new Iterator[] { batchIterator };
+  }
+
+  @Override public void close() {
+    if (intermediateFileMerger != null) {
+      intermediateFileMerger.close();
+    }
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      // start sorting
+      sortDataRows.startSorting();
+
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+              System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows sortDataRows;
+
+    private Object[][] buffer;
+
+    private AtomicLong rowCounter;
+
+    private ThreadStatusObserver observer;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
+        int batchSize, AtomicLong rowCounter, ThreadStatusObserver observer) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
+      this.observer = observer;
+
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              buffer[i++] = row.getData();
+            }
+          }
+          if (i > 0) {
+            sortDataRows.addRowBatch(buffer, i);
+            rowCounter.getAndAdd(i);
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        observer.notifyFailed(e);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..51db3a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private SortIntermediateFileMerger[] intermediateFileMergers;
+
+  private BucketingInfo bucketingInfo;
+
+  private int sortBufferSize;
+
+  private AtomicLong rowCounter;
+
+  public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
+      BucketingInfo bucketingInfo) {
+    this.rowCounter = rowCounter;
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+    sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
+    if (sortBufferSize < 100) {
+      sortBufferSize = 100;
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+    intermediateFileMergers =
+        new SortIntermediateFileMerger[sortDataRows.length];
+    try {
+      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+        SortParameters parameters = sortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        setTempLocation(parameters);
+        parameters.setBufferSize(sortBufferSize);
+        intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
+        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+            this.threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, sortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
+    }
+
+    return batchIterator;
+  }
+
+  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), bucketId,
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
+            sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
+            sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
+            sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            this.sortParameters.getNoDictionarySortColumn());
+  }
+
+  @Override public void close() {
+    for (int i = 0; i < intermediateFileMergers.length; i++) {
+      intermediateFileMergers[i].close();
+    }
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(),
+            parameters.getPartitionID(), parameters.getSegmentId(), false, false);
+    String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    parameters.setTempFileLocation(tmpLocs);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows[] sortDataRows;
+
+    private AtomicLong rowCounter;
+
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
+        AtomicLong rowCounter, ThreadStatusObserver observer) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.rowCounter = rowCounter;
+      this.threadStatusObserver = observer;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+                rowCounter.getAndAdd(1);
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private String partitionId;
+
+    private int batchSize;
+
+    private boolean firstRow = true;
+
+    public MergedDataIterator(String partitionId, int batchSize) {
+      this.partitionId = partitionId;
+      this.batchSize = batchSize;
+    }
+
+    private SingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(partitionId);
+        finalMerger.startFinalMerge();
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
new file mode 100644
index 0000000..ed35a96
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.util.concurrent.ExecutorService;
+
+public class ThreadStatusObserver {
+
+  /**
+   * lock object
+   */
+  private Object lock = new Object();
+
+  private ExecutorService executorService;
+
+  private Throwable throwable;
+
+  public ThreadStatusObserver(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  public void notifyFailed(Throwable throwable) {
+    // Only the first failing thread should call for shutting down the executor service and
+    // should assign the throwable object else the actual cause for failure can be overridden as
+    // all the running threads will throw interrupted exception on calling shutdownNow and
+    // will override the throwable object
+    if (null == this.throwable) {
+      synchronized (lock) {
+        if (null == this.throwable) {
+          executorService.shutdownNow();
+          this.throwable = throwable;
+        }
+      }
+    }
+  }
+
+  public Throwable getThrowable() {
+    return throwable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..c5579d9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.row.CarbonSortBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * It sorts data in batches and send to the next step.
+ */
+public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private ExecutorService executorService;
+
+  private AtomicLong rowCounter;
+
+  public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
+        this.threadStatusObserver);
+
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(
+            new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
+                this.threadStatusObserver));
+      }
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    // Creates the iterator to read from merge sorter.
+    Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
+
+      @Override public boolean hasNext() {
+        return sortBatchHolder.hasNext();
+      }
+
+      @Override public CarbonSortBatch next() {
+        return new CarbonSortBatch(sortBatchHolder.next());
+      }
+    };
+    return new Iterator[] { batchIterator };
+  }
+
+  @Override public void close() {
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortBatchHolder sortDataRows;
+
+    private Object[][] buffer;
+
+    private AtomicLong rowCounter;
+
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
+        int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
+      this.threadStatusObserver = threadStatusObserver;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              buffer[i++] = row.getData();
+            }
+          }
+          if (i > 0) {
+            synchronized (sortDataRows) {
+              sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
+              rowCounter.getAndAdd(i);
+              if (!sortDataRows.getSortDataRow().canAdd()) {
+                sortDataRows.finish(false);
+                sortDataRows.createSortDataRows();
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      } finally {
+        sortDataRows.finishThread();
+      }
+    }
+
+  }
+
+  private static class SortBatchHolder
+      extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
+
+    private SortParameters sortParameters;
+
+    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+    private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+    private UnsafeSortDataRows sortDataRow;
+
+    private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
+
+    private AtomicInteger iteratorCount;
+
+    private int batchCount;
+
+    private ThreadStatusObserver threadStatusObserver;
+
+    private final Object lock = new Object();
+
+    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
+        ThreadStatusObserver threadStatusObserver) {
+      this.sortParameters = sortParameters.getCopy();
+      this.iteratorCount = new AtomicInteger(numberOfThreads);
+      this.mergerQueue = new LinkedBlockingQueue<>(1);
+      this.threadStatusObserver = threadStatusObserver;
+      createSortDataRows();
+    }
+
+    private void createSortDataRows() {
+      int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      setTempLocation(sortParameters);
+      this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+          sortParameters.getTempFileLocation());
+      unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+      sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
+          inMemoryChunkSizeInMB);
+
+      try {
+        sortDataRow.initialize();
+      } catch (MemoryException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+      batchCount++;
+    }
+
+    private void setTempLocation(SortParameters parameters) {
+      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+          .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
+            parameters.getSegmentId(), false, false);
+      String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+          File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+      parameters.setTempFileLocation(tempDirs);
+    }
+
+    @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
+      try {
+        UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
+            mergerQueue.take();
+        if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
+          throw new RuntimeException(threadStatusObserver.getThrowable());
+        }
+        return unsafeSingleThreadFinalSortFilesMerger;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public UnsafeSortDataRows getSortDataRow() {
+      return sortDataRow;
+    }
+
+    public void finish(boolean isFinalAttempt) {
+      try {
+        // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
+        // then set stop process to true in the finalmerger instance
+        if (mergerQueue.isEmpty() && threadStatusObserver != null
+            && threadStatusObserver.getThrowable() != null && threadStatusObserver
+            .getThrowable() instanceof CarbonDataLoadingException) {
+          finalMerger.setStopProcess(true);
+          if (isFinalAttempt) {
+            iteratorCount.decrementAndGet();
+          }
+          mergerQueue.put(finalMerger);
+          return;
+        }
+        processRowToNextStep(sortDataRow, sortParameters);
+        unsafeIntermediateFileMerger.finish();
+        List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+            unsafeIntermediateFileMerger.getMergedPages());
+        unsafeIntermediateFileMerger.close();
+        if (isFinalAttempt) {
+          iteratorCount.decrementAndGet();
+        }
+        mergerQueue.put(finalMerger);
+        sortDataRow = null;
+        unsafeIntermediateFileMerger = null;
+        finalMerger = null;
+      } catch (CarbonDataWriterException e) {
+        throw new CarbonDataLoadingException(e);
+      } catch (CarbonSortKeyAndGroupByException e) {
+        throw new CarbonDataLoadingException(e);
+      } catch (InterruptedException e) {
+        // if fails to put in queue because of interrupted exception, we can offer to free the main
+        // thread from waiting.
+        if (finalMerger != null) {
+          finalMerger.setStopProcess(true);
+          boolean offered = mergerQueue.offer(finalMerger);
+          if (!offered) {
+            throw new CarbonDataLoadingException(e);
+          }
+        }
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+
+    public void finishThread() {
+      synchronized (lock) {
+        if (iteratorCount.get() <= 1) {
+          finish(true);
+        } else {
+          iteratorCount.decrementAndGet();
+        }
+      }
+    }
+
+    public boolean hasNext() {
+      return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
+    }
+
+    /**
+     * Below method will be used to process data to next step
+     */
+    private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+        throws CarbonDataLoadingException {
+      if (null == sortDataRows) {
+        LOGGER.info("Record Processed For table: " + parameters.getTableName());
+        LOGGER.info("Number of Records was Zero");
+        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+        LOGGER.info(logMessage);
+        return false;
+      }
+
+      try {
+        // start sorting
+        sortDataRows.startSorting();
+
+        // check any more rows are present
+        LOGGER.info("Record Processed For table: " + parameters.getTableName());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+                System.currentTimeMillis());
+        return false;
+      } catch (InterruptedException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..1a2f704
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ */
+public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+  private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+  private AtomicLong rowCounter;
+
+  public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+
+    finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+        sortParameters.getTempFileLocation());
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    UnsafeSortDataRows sortDataRow =
+        new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      sortDataRow.initialize();
+    } catch (MemoryException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(
+            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+                this.threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRow, sortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      unsafeIntermediateFileMerger.finish();
+      List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+      finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+          unsafeIntermediateFileMerger.getMergedPages());
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    // Creates the iterator to read from merge sorter.
+    Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
+
+      @Override public boolean hasNext() {
+        return finalMerger.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        int counter = 0;
+        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+        while (finalMerger.hasNext() && counter < batchSize) {
+          rowBatch.addRow(new CarbonRow(finalMerger.next()));
+          counter++;
+        }
+        return rowBatch;
+      }
+    };
+    return new Iterator[] { batchIterator };
+  }
+
+  @Override public void close() {
+    unsafeIntermediateFileMerger.close();
+    finalMerger.clear();
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      // start sorting
+      sortDataRows.startSorting();
+
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (InterruptedException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private UnsafeSortDataRows sortDataRows;
+
+    private Object[][] buffer;
+
+    private AtomicLong rowCounter;
+
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
+        ThreadStatusObserver threadStatusObserver) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
+      this.threadStatusObserver = threadStatusObserver;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              buffer[i++] = row.getData();
+            }
+          }
+          if (i > 0) {
+            sortDataRows.addRowBatch(buffer, i);
+            rowCounter.getAndAdd(i);
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..3c48e4d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(
+                UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private BucketingInfo bucketingInfo;
+
+  public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+      BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
+    UnsafeIntermediateMerger[] intermediateFileMergers =
+        new UnsafeIntermediateMerger[sortDataRows.length];
+    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
+    if (inMemoryChunkSizeInMB < 5) {
+      inMemoryChunkSizeInMB = 5;
+    }
+    try {
+      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+        SortParameters parameters = sortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        setTempLocation(parameters);
+        intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+        sortDataRows[i] =
+            new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+        sortDataRows[i].initialize();
+      }
+    } catch (MemoryException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
+            .threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, sortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    checkError();
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+    for (int i = 0; i < sortDataRows.length; i++) {
+      batchIterator[i] =
+          new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+    }
+
+    return batchIterator;
+  }
+
+  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), bucketId,
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
+        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+  }
+
+  @Override public void close() {
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+            false, false);
+    String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    parameters.setTempFileLocation(tmpLoc);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private UnsafeSortDataRows[] sortDataRows;
+
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.threadStatusObserver = threadStatusObserver;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private String partitionId;
+
+    private int batchSize;
+
+    private boolean firstRow;
+
+    private UnsafeIntermediateMerger intermediateMerger;
+
+    public MergedDataIterator(String partitionId, int batchSize,
+        UnsafeIntermediateMerger intermediateMerger) {
+      this.partitionId = partitionId;
+      this.batchSize = batchSize;
+      this.intermediateMerger = intermediateMerger;
+      this.firstRow = true;
+    }
+
+    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(partitionId);
+        List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+            intermediateMerger.getMergedPages());
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}