You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:21 UTC

[05/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
new file mode 100644
index 0000000..0883ae1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class RowComparatorForNormalDims implements Comparator<Object[]> {
+  /**
+   * dimension count
+   */
+  private int numberOfSortColumns;
+
+  /**
+   * RowComparatorForNormalDims Constructor
+   *
+   * @param numberOfSortColumns
+   */
+  public RowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  /**
+   * Below method will be used to compare two surrogate keys
+   *
+   * @see Comparator#compare(Object, Object)
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    for (int i = 0; i < numberOfSortColumns; i++) {
+
+      int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
+      int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
+
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
new file mode 100644
index 0000000..6d6ff94
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SingleThreadFinalSortFilesMerger.class.getName());
+
+  /**
+   * lockObject
+   */
+  private static final Object LOCKOBJECT = new Object();
+
+  /**
+   * fileCounter
+   */
+  private int fileCounter;
+
+  /**
+   * fileBufferSize
+   */
+  private int fileBufferSize;
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal;
+
+  /**
+   * tableName
+   */
+  private String tableName;
+
+  /**
+   * measureCount
+   */
+  private int measureCount;
+
+  /**
+   * dimensionCount
+   */
+  private int dimensionCount;
+
+  /**
+   * measure count
+   */
+  private int noDictionaryCount;
+
+  /**
+   * complexDimensionCount
+   */
+  private int complexDimensionCount;
+
+  /**
+   * tempFileLocation
+   */
+  private String[] tempFileLocation;
+
+  private DataType[] measureDataType;
+
+  /**
+   * below code is to check whether dimension
+   * is of no dictionary type or not
+   */
+  private boolean[] isNoDictionaryColumn;
+
+  private boolean[] isNoDictionarySortColumn;
+
+  public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
+      int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
+      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
+    this.tempFileLocation = tempFileLocation;
+    this.tableName = tableName;
+    this.dimensionCount = dimensionCount;
+    this.complexDimensionCount = complexDimensionCount;
+    this.measureCount = measureCount;
+    this.measureDataType = type;
+    this.noDictionaryCount = noDictionaryCount;
+    this.isNoDictionaryColumn = isNoDictionaryColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+  }
+
+  /**
+   * This method will be used to merger the merged files
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  public void startFinalMerge() throws CarbonDataWriterException {
+    List<File> filesToMerge = getFilesToMergeSort();
+    if (filesToMerge.size() == 0)
+    {
+      LOGGER.info("No file to merge in final merge stage");
+      return;
+    }
+
+    startSorting(filesToMerge);
+  }
+
+  private List<File> getFilesToMergeSort() {
+    FileFilter fileFilter = new FileFilter() {
+      public boolean accept(File pathname) {
+        return pathname.getName().startsWith(tableName);
+      }
+    };
+
+    // get all the merged files
+    List<File> files = new ArrayList<File>(tempFileLocation.length);
+    for (String tempLoc : tempFileLocation)
+    {
+      File[] subFiles = new File(tempLoc).listFiles(fileFilter);
+      if (null != subFiles && subFiles.length > 0)
+      {
+        files.addAll(Arrays.asList(subFiles));
+      }
+    }
+
+    return files;
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startSorting(List<File> files) throws CarbonDataWriterException {
+    this.fileCounter = files.size();
+    if (fileCounter == 0) {
+      LOGGER.info("No files to merge sort");
+      return;
+    }
+    this.fileBufferSize = CarbonDataProcessorUtil
+        .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
+            CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+    LOGGER.info("Number of temp file: " + this.fileCounter);
+
+    LOGGER.info("File Buffer Size: " + this.fileBufferSize);
+
+    // create record holder heap
+    createRecordHolderQueue();
+
+    // iterate over file list and create chunk holder and add to heap
+    LOGGER.info("Started adding first record from each file");
+    int maxThreadForSorting = 0;
+    try {
+      maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
+              CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
+    } catch (NumberFormatException e) {
+      maxThreadForSorting =
+          Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
+    }
+    ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
+
+    for (final File tempFile : files) {
+
+      Runnable runnable = new Runnable() {
+        @Override public void run() {
+
+            // create chunk holder
+            SortTempFileChunkHolder sortTempFileChunkHolder =
+                new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
+                    measureCount, fileBufferSize, noDictionaryCount, measureDataType,
+                    isNoDictionaryColumn, isNoDictionarySortColumn);
+          try {
+            // initialize
+            sortTempFileChunkHolder.initialize();
+            sortTempFileChunkHolder.readRow();
+          } catch (CarbonSortKeyAndGroupByException ex) {
+            LOGGER.error(ex);
+          }
+
+          synchronized (LOCKOBJECT) {
+            recordHolderHeapLocal.add(sortTempFileChunkHolder);
+          }
+        }
+      };
+      service.execute(runnable);
+    }
+    service.shutdown();
+
+    try {
+      service.awaitTermination(2, TimeUnit.HOURS);
+    } catch (Exception e) {
+      throw new CarbonDataWriterException(e.getMessage(), e);
+    }
+
+    LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   */
+  private void createRecordHolderQueue() {
+    // creating record holder heap
+    this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  public Object[] next() {
+    return getSortedRecordFromFile();
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+    Object[] row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    SortTempFileChunkHolder poll = this.recordHolderHeapLocal.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // if chunk is empty then close the stream
+      poll.closeStream();
+
+      // change the file counter
+      --this.fileCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    try {
+      poll.readRow();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataWriterException(e.getMessage(), e);
+    }
+
+    // add to heap
+    this.recordHolderHeapLocal.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  public boolean hasNext() {
+    return this.fileCounter > 0;
+  }
+
+  public void clear() {
+    if (null != recordHolderHeapLocal) {
+      recordHolderHeapLocal = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
new file mode 100644
index 0000000..fc744a6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SortDataRows {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SortDataRows.class.getName());
+  /**
+   * entryCount
+   */
+  private int entryCount;
+  /**
+   * record holder array
+   */
+  private Object[][] recordHolderList;
+  /**
+   * threadStatusObserver
+   */
+  private ThreadStatusObserver threadStatusObserver;
+  /**
+   * executor service for data sort holder
+   */
+  private ExecutorService dataSorterAndWriterExecutorService;
+  /**
+   * semaphore which will used for managing sorted data object arrays
+   */
+  private Semaphore semaphore;
+
+  private SortParameters parameters;
+
+  private int sortBufferSize;
+
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  private final Object addRowsLock = new Object();
+
+  public SortDataRows(SortParameters parameters,
+      SortIntermediateFileMerger intermediateFileMerger) {
+    this.parameters = parameters;
+
+    this.intermediateFileMerger = intermediateFileMerger;
+
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+
+    this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
+    // observer of writing file in thread
+    this.threadStatusObserver = new ThreadStatusObserver();
+  }
+
+  /**
+   * This method will be used to initialize
+   */
+  public void initialize() throws CarbonSortKeyAndGroupByException {
+
+    // create holder list which will hold incoming rows
+    // size of list will be sort buffer size + 1 to avoid creation of new
+    // array in list array
+    this.recordHolderList = new Object[sortBufferSize][];
+    // Delete if any older file exists in sort temp folder
+    deleteSortLocationIfExists();
+
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
+    this.dataSorterAndWriterExecutorService =
+        Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    semaphore = new Semaphore(parameters.getNumberOfCores());
+  }
+
+  /**
+   * This method will be used to add new row
+   *
+   * @param row new row
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    int currentSize = entryCount;
+
+    if (sortBufferSize == currentSize) {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("************ Writing to temp file ********** ");
+      }
+      intermediateFileMerger.startMergingIfPossible();
+      Object[][] recordHolderListLocal = recordHolderList;
+      try {
+        semaphore.acquire();
+        dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
+      } catch (InterruptedException e) {
+        LOGGER.error(
+            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+        throw new CarbonSortKeyAndGroupByException(e.getMessage());
+      }
+      // create the new holder Array
+      this.recordHolderList = new Object[this.sortBufferSize][];
+      this.entryCount = 0;
+    }
+    recordHolderList[entryCount++] = row;
+  }
+
+  /**
+   * This method will be used to add new row
+   *
+   * @param rowBatch new rowBatch
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    synchronized (addRowsLock) {
+      int sizeLeft = 0;
+      if (entryCount + size >= sortBufferSize) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("************ Writing to temp file ********** ");
+        }
+        intermediateFileMerger.startMergingIfPossible();
+        Object[][] recordHolderListLocal = recordHolderList;
+        sizeLeft = sortBufferSize - entryCount ;
+        if (sizeLeft > 0) {
+          System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
+        }
+        try {
+          semaphore.acquire();
+          dataSorterAndWriterExecutorService
+              .execute(new DataSorterAndWriter(recordHolderListLocal));
+        } catch (Exception e) {
+          LOGGER.error(
+              "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+          throw new CarbonSortKeyAndGroupByException(e);
+        }
+        // create the new holder Array
+        this.recordHolderList = new Object[this.sortBufferSize][];
+        this.entryCount = 0;
+        size = size - sizeLeft;
+        if (size == 0) {
+          return;
+        }
+      }
+      System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
+      entryCount += size;
+    }
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  public void startSorting() throws CarbonSortKeyAndGroupByException {
+    LOGGER.info("File based sorting will be used");
+    if (this.entryCount > 0) {
+      Object[][] toSort;
+      toSort = new Object[entryCount][];
+      System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
+      } else {
+        Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
+      }
+      recordHolderList = toSort;
+
+      // create new file and choose folder randomly
+      String[] tmpLocation = parameters.getTempFileLocation();
+      String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
+      File file = new File(
+          locationChosen + File.separator + parameters.getTableName() +
+              System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+      writeDataTofile(recordHolderList, this.entryCount, file);
+
+    }
+
+    startFileBasedMerge();
+    this.recordHolderList = null;
+  }
+
+  /**
+   * Below method will be used to write data to file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file)
+      throws CarbonSortKeyAndGroupByException {
+    // stream
+    if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
+      writeSortTempFile(recordHolderList, entryCountLocal, file);
+      return;
+    }
+    writeData(recordHolderList, entryCountLocal, file);
+  }
+
+  private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
+      throws CarbonSortKeyAndGroupByException {
+    TempSortFileWriter writer = null;
+
+    try {
+      writer = getWriter();
+      writer.initiaize(file, entryCountLocal);
+      writer.writeSortTempFile(recordHolderList);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e, "Problem while writing the sort temp file");
+      throw e;
+    } finally {
+      if (writer != null) {
+        writer.finish();
+      }
+    }
+  }
+
+  private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
+      throws CarbonSortKeyAndGroupByException {
+    DataOutputStream stream = null;
+    try {
+      // open stream
+      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+          parameters.getFileWriteBufferSize()));
+
+      // write number of entries to the file
+      stream.writeInt(entryCountLocal);
+      int complexDimColCount = parameters.getComplexDimColCount();
+      int dimColCount = parameters.getDimColCount() + complexDimColCount;
+      DataType[] type = parameters.getMeasureDataType();
+      boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
+      Object[] row = null;
+      for (int i = 0; i < entryCountLocal; i++) {
+        // get row from record holder list
+        row = recordHolderList[i];
+        int dimCount = 0;
+        // write dictionary and non dictionary dimensions here.
+        for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
+          if (noDictionaryDimnesionMapping[dimCount]) {
+            byte[] col = (byte[]) row[dimCount];
+            stream.writeShort(col.length);
+            stream.write(col);
+          } else {
+            stream.writeInt((int)row[dimCount]);
+          }
+        }
+        // write complex dimensions here.
+        for (; dimCount < dimColCount; dimCount++) {
+          byte[] value = (byte[])row[dimCount];
+          stream.writeShort(value.length);
+          stream.write(value);
+        }
+        // as measures are stored in separate array.
+        for (int mesCount = 0;
+             mesCount < parameters.getMeasureColCount(); mesCount++) {
+          Object value = row[mesCount + dimColCount];
+          if (null != value) {
+            stream.write((byte) 1);
+            switch (type[mesCount]) {
+              case SHORT:
+                stream.writeShort((Short) value);
+                break;
+              case INT:
+                stream.writeInt((Integer) value);
+                break;
+              case LONG:
+                stream.writeLong((Long) value);
+                break;
+              case DOUBLE:
+                stream.writeDouble((Double) value);
+                break;
+              case DECIMAL:
+                BigDecimal val = (BigDecimal) value;
+                byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+                stream.writeInt(bigDecimalInBytes.length);
+                stream.write(bigDecimalInBytes);
+                break;
+              default:
+                throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
+            }
+          } else {
+            stream.write((byte) 0);
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    } finally {
+      // close streams
+      CarbonUtil.closeStreams(stream);
+    }
+  }
+
+  private TempSortFileWriter getWriter() {
+    TempSortFileWriter chunkWriter = null;
+    TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
+        .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
+            parameters.getDimColCount(), parameters.getComplexDimColCount(),
+            parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
+            parameters.getFileWriteBufferSize());
+
+    if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
+      chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
+    } else {
+      chunkWriter =
+          new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
+    }
+
+    return chunkWriter;
+  }
+
+  /**
+   * This method will be used to delete sort temp location is it is exites
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
+  }
+
+  /**
+   * Below method will be used to start file based merge
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
+    try {
+      dataSorterAndWriterExecutorService.shutdown();
+      dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+    }
+  }
+
+  /**
+   * Observer class for thread execution
+   * In case of any failure we need stop all the running thread
+   */
+  private class ThreadStatusObserver {
+    /**
+     * Below method will be called if any thread fails during execution
+     *
+     * @param exception
+     * @throws CarbonSortKeyAndGroupByException
+     */
+    public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
+      dataSorterAndWriterExecutorService.shutdownNow();
+      intermediateFileMerger.close();
+      parameters.getObserver().setFailed(true);
+      LOGGER.error(exception);
+      throw new CarbonSortKeyAndGroupByException(exception);
+    }
+  }
+
+  /**
+   * This class is responsible for sorting and writing the object
+   * array which holds the records equal to given array size
+   */
+  private class DataSorterAndWriter implements Runnable {
+    private Object[][] recordHolderArray;
+
+    public DataSorterAndWriter(Object[][] recordHolderArray) {
+      this.recordHolderArray = recordHolderArray;
+    }
+
+    @Override
+    public void run() {
+      try {
+        long startTime = System.currentTimeMillis();
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
+          Arrays.sort(recordHolderArray,
+              new NewRowComparator(parameters.getNoDictionarySortColumn()));
+        } else {
+          Arrays.sort(recordHolderArray,
+              new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
+        }
+
+        // create a new file and choose folder randomly every time
+        String[] tmpFileLocation = parameters.getTempFileLocation();
+        String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
+        File sortTempFile = new File(
+            locationChosen + File.separator + parameters.getTableName() + System
+                .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+        writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile);
+        // add sort temp filename to and arrayList. When the list size reaches 20 then
+        // intermediate merging of sort temp files will be triggered
+        intermediateFileMerger.addFileToMerge(sortTempFile);
+        LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
+            System.currentTimeMillis() - startTime));
+      } catch (Throwable e) {
+        try {
+          threadStatusObserver.notifyFailed(e);
+        } catch (CarbonSortKeyAndGroupByException ex) {
+          LOGGER.error(ex);
+        }
+      } finally {
+        semaphore.release();
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
new file mode 100644
index 0000000..d234ce2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * It does mergesort intermediate files to big file.
+ */
+public class SortIntermediateFileMerger {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
+
+  /**
+   * executorService
+   */
+  private ExecutorService executorService;
+  /**
+   * procFiles
+   */
+  private List<File> procFiles;
+
+  private SortParameters parameters;
+
+  private final Object lockObject = new Object();
+
+  public SortIntermediateFileMerger(SortParameters parameters) {
+    this.parameters = parameters;
+    // processed file list
+    this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
+  }
+
+  public void addFileToMerge(File sortTempFile) {
+    // add sort temp filename to and arrayList. When the list size reaches 20 then
+    // intermediate merging of sort temp files will be triggered
+    synchronized (lockObject) {
+      procFiles.add(sortTempFile);
+    }
+  }
+
+  public void startMergingIfPossible() {
+    File[] fileList;
+    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+      synchronized (lockObject) {
+        fileList = procFiles.toArray(new File[procFiles.size()]);
+        this.procFiles = new ArrayList<File>();
+      }
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
+      }
+      startIntermediateMerging(fileList);
+    }
+  }
+
+  /**
+   * Below method will be used to start the intermediate file merging
+   *
+   * @param intermediateFiles
+   */
+  private void startIntermediateMerging(File[] intermediateFiles) {
+    int index = new Random().nextInt(parameters.getTempFileLocation().length);
+    String chosenTempDir = parameters.getTempFileLocation()[index];
+    File file = new File(
+        chosenTempDir + File.separator + parameters.getTableName() + System
+            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+    IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
+    executorService.execute(merger);
+  }
+
+  public void finish() throws CarbonSortKeyAndGroupByException {
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+    }
+    procFiles.clear();
+    procFiles = null;
+  }
+
+  public void close() {
+    if (executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
new file mode 100644
index 0000000..681e60b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.Serializable;
+
+public class SortObserver implements Serializable {
+  /**
+   * is failed
+   */
+  private boolean isFailed;
+
+  /**
+   * @return the isFailed
+   */
+  public boolean isFailed() {
+    return isFailed;
+  }
+
+  /**
+   * @param isFailed the isFailed to set
+   */
+  public void setFailed(boolean isFailed) {
+    this.isFailed = isFailed;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
new file mode 100644
index 0000000..39e1049
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class SortParameters implements Serializable {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SortParameters.class.getName());
+  /**
+   * tempFileLocation
+   */
+  private String[] tempFileLocation;
+  /**
+   * sortBufferSize
+   */
+  private int sortBufferSize;
+  /**
+   * measure count
+   */
+  private int measureColCount;
+  /**
+   * measure count
+   */
+  private int dimColCount;
+  /**
+   * measure count
+   */
+  private int complexDimColCount;
+  /**
+   * fileBufferSize
+   */
+  private int fileBufferSize;
+  /**
+   * numberOfIntermediateFileToBeMerged
+   */
+  private int numberOfIntermediateFileToBeMerged;
+  /**
+   * fileWriteBufferSize
+   */
+  private int fileWriteBufferSize;
+  /**
+   * observer
+   */
+  private SortObserver observer;
+  /**
+   * sortTempFileNoOFRecordsInCompression
+   */
+  private int sortTempFileNoOFRecordsInCompression;
+  /**
+   * isSortTempFileCompressionEnabled
+   */
+  private boolean isSortFileCompressionEnabled;
+  /**
+   * prefetch
+   */
+  private boolean prefetch;
+  /**
+   * bufferSize
+   */
+  private int bufferSize;
+
+  private String databaseName;
+
+  private String tableName;
+
+  private DataType[] measureDataType;
+
+  /**
+   * To know how many columns are of high cardinality.
+   */
+  private int noDictionaryCount;
+  /**
+   * partitionID
+   */
+  private String partitionID;
+  /**
+   * Id of the load folder
+   */
+  private String segmentId;
+  /**
+   * task id, each spark task has a unique id
+   */
+  private String taskNo;
+
+  /**
+   * This will tell whether dimension is dictionary or not.
+   */
+  private boolean[] noDictionaryDimnesionColumn;
+
+  private boolean[] noDictionarySortColumn;
+
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
+  private int numberOfCores;
+
+  private int batchSortSizeinMb;
+
+  public SortParameters getCopy() {
+    SortParameters parameters = new SortParameters();
+    parameters.tempFileLocation = tempFileLocation;
+    parameters.sortBufferSize = sortBufferSize;
+    parameters.measureColCount = measureColCount;
+    parameters.dimColCount = dimColCount;
+    parameters.complexDimColCount = complexDimColCount;
+    parameters.fileBufferSize = fileBufferSize;
+    parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+    parameters.fileWriteBufferSize = fileWriteBufferSize;
+    parameters.observer = observer;
+    parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+    parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
+    parameters.prefetch = prefetch;
+    parameters.bufferSize = bufferSize;
+    parameters.databaseName = databaseName;
+    parameters.tableName = tableName;
+    parameters.measureDataType = measureDataType;
+    parameters.noDictionaryCount = noDictionaryCount;
+    parameters.partitionID = partitionID;
+    parameters.segmentId = segmentId;
+    parameters.taskNo = taskNo;
+    parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.noDictionarySortColumn = noDictionarySortColumn;
+    parameters.numberOfSortColumns = numberOfSortColumns;
+    parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+    parameters.numberOfCores = numberOfCores;
+    parameters.batchSortSizeinMb = batchSortSizeinMb;
+    return parameters;
+  }
+
+  public String[] getTempFileLocation() {
+    return tempFileLocation;
+  }
+
+  public void setTempFileLocation(String[] tempFileLocation) {
+    this.tempFileLocation = tempFileLocation;
+  }
+
+  public int getSortBufferSize() {
+    return sortBufferSize;
+  }
+
+  public void setSortBufferSize(int sortBufferSize) {
+    this.sortBufferSize = sortBufferSize;
+  }
+
+  public int getMeasureColCount() {
+    return measureColCount;
+  }
+
+  public void setMeasureColCount(int measureColCount) {
+    this.measureColCount = measureColCount;
+  }
+
+  public int getDimColCount() {
+    return dimColCount;
+  }
+
+  public void setDimColCount(int dimColCount) {
+    this.dimColCount = dimColCount;
+  }
+
+  public int getComplexDimColCount() {
+    return complexDimColCount;
+  }
+
+  public void setComplexDimColCount(int complexDimColCount) {
+    this.complexDimColCount = complexDimColCount;
+  }
+
+  public int getFileBufferSize() {
+    return fileBufferSize;
+  }
+
+  public void setFileBufferSize(int fileBufferSize) {
+    this.fileBufferSize = fileBufferSize;
+  }
+
+  public int getNumberOfIntermediateFileToBeMerged() {
+    return numberOfIntermediateFileToBeMerged;
+  }
+
+  public void setNumberOfIntermediateFileToBeMerged(int numberOfIntermediateFileToBeMerged) {
+    this.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+  }
+
+  public int getFileWriteBufferSize() {
+    return fileWriteBufferSize;
+  }
+
+  public void setFileWriteBufferSize(int fileWriteBufferSize) {
+    this.fileWriteBufferSize = fileWriteBufferSize;
+  }
+
+  public SortObserver getObserver() {
+    return observer;
+  }
+
+  public void setObserver(SortObserver observer) {
+    this.observer = observer;
+  }
+
+  public int getSortTempFileNoOFRecordsInCompression() {
+    return sortTempFileNoOFRecordsInCompression;
+  }
+
+  public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression) {
+    this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+  }
+
+  public boolean isSortFileCompressionEnabled() {
+    return isSortFileCompressionEnabled;
+  }
+
+  public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
+    isSortFileCompressionEnabled = sortFileCompressionEnabled;
+  }
+
+  public boolean isPrefetch() {
+    return prefetch;
+  }
+
+  public void setPrefetch(boolean prefetch) {
+    this.prefetch = prefetch;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public void setBufferSize(int bufferSize) {
+    this.bufferSize = bufferSize;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
+  }
+
+  public void setMeasureDataType(DataType[] measureDataType) {
+    this.measureDataType = measureDataType;
+  }
+
+  public int getNoDictionaryCount() {
+    return noDictionaryCount;
+  }
+
+  public void setNoDictionaryCount(int noDictionaryCount) {
+    this.noDictionaryCount = noDictionaryCount;
+  }
+
+  public String getPartitionID() {
+    return partitionID;
+  }
+
+  public void setPartitionID(String partitionID) {
+    this.partitionID = partitionID;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  public boolean[] getNoDictionaryDimnesionColumn() {
+    return noDictionaryDimnesionColumn;
+  }
+
+  public void setNoDictionaryDimnesionColumn(boolean[] noDictionaryDimnesionColumn) {
+    this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+  }
+
+  public int getNumberOfCores() {
+    return numberOfCores;
+  }
+
+  public void setNumberOfCores(int numberOfCores) {
+    this.numberOfCores = numberOfCores;
+  }
+
+  public int getNumberOfSortColumns() {
+    return numberOfSortColumns;
+  }
+
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
+  }
+
+  public boolean[] getNoDictionarySortColumn() {
+    return noDictionarySortColumn;
+  }
+
+  public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
+    this.noDictionarySortColumn = noDictionarySortColumn;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return numberOfNoDictSortColumns;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount);
+  }
+
+  public int getBatchSortSizeinMb() {
+    return batchSortSizeinMb;
+  }
+
+  public void setBatchSortSizeinMb(int batchSortSizeinMb) {
+    this.batchSortSizeinMb = batchSortSizeinMb;
+  }
+
+  public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
+    SortParameters parameters = new SortParameters();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setDatabaseName(tableIdentifier.getDatabaseName());
+    parameters.setTableName(tableIdentifier.getTableName());
+    parameters.setPartitionID(configuration.getPartitionId());
+    parameters.setSegmentId(configuration.getSegmentId());
+    parameters.setTaskNo(configuration.getTaskNo());
+    parameters.setMeasureColCount(configuration.getMeasureCount());
+    parameters.setDimColCount(
+        configuration.getDimensionCount() - configuration.getComplexColumnCount());
+    parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
+    parameters.setComplexDimColCount(configuration.getComplexColumnCount());
+    parameters.setNoDictionaryDimnesionColumn(
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
+
+    parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
+    parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
+    setNoDictionarySortColumnMapping(parameters);
+    parameters.setObserver(new SortObserver());
+    // get sort buffer size
+    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_SIZE,
+            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+    // set number of intermedaite file to merge
+    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+    LOGGER.info("Number of intermediate file to be merged: " + parameters
+        .getNumberOfIntermediateFileToBeMerged());
+
+    // get file buffer size
+    parameters.setFileBufferSize(CarbonDataProcessorUtil
+        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+            CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), configuration.getTaskNo(),
+            configuration.getPartitionId(), configuration.getSegmentId(), false, false);
+    String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+
+    parameters.setTempFileLocation(sortTempDirs);
+    LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
+
+    int numberOfCores;
+    try {
+      numberOfCores = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+      numberOfCores = numberOfCores / 2;
+    } catch (NumberFormatException exc) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
+
+    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+    int sortTempFileNoOFRecordsInCompression;
+    try {
+      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
+            + "be used");
+
+        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed. Default value will be used");
+
+      sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+    if (parameters.isSortFileCompressionEnabled()) {
+      LOGGER.info("Compression will be used for writing the sort temp File");
+    }
+
+    parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
+    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
+
+    DataType[] measureDataType = configuration.getMeasureDataType();
+    parameters.setMeasureDataType(measureDataType);
+    return parameters;
+  }
+
+  /**
+   * this method will set the boolean mapping for no dictionary sort columns
+   *
+   * @param parameters
+   */
+  private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
+    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
+      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
+    } else {
+      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
+      System
+          .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp, 0,
+              parameters.getNumberOfSortColumns());
+      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
+    }
+  }
+
+  public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
+      String tableName, int dimColCount, int complexDimColCount, int measureColCount,
+      int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+      boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
+    SortParameters parameters = new SortParameters();
+    CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setDatabaseName(databaseName);
+    parameters.setTableName(tableName);
+    parameters.setPartitionID(partitionID);
+    parameters.setSegmentId(segmentId);
+    parameters.setTaskNo(taskNo);
+    parameters.setMeasureColCount(measureColCount);
+    parameters.setDimColCount(dimColCount - complexDimColCount);
+    parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+    parameters.setNoDictionaryCount(noDictionaryCount);
+    parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+    parameters.setComplexDimColCount(complexDimColCount);
+    parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setObserver(new SortObserver());
+    // get sort buffer size
+    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_SIZE,
+            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+    // set number of intermedaite file to merge
+    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+    LOGGER.info("Number of intermediate file to be merged: " + parameters
+        .getNumberOfIntermediateFileToBeMerged());
+
+    // get file buffer size
+    parameters.setFileBufferSize(CarbonDataProcessorUtil
+        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+            CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
+            isCompactionFlow, false);
+    String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    parameters.setTempFileLocation(sortTempDirs);
+    LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
+
+    int numberOfCores;
+    try {
+      numberOfCores = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+      numberOfCores = numberOfCores / 2;
+    } catch (NumberFormatException exc) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
+
+    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+    int sortTempFileNoOFRecordsInCompression;
+    try {
+      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
+            + "be used");
+
+        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed. Default value will be used");
+
+      sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+    if (parameters.isSortFileCompressionEnabled()) {
+      LOGGER.info("Compression will be used for writing the sort temp File");
+    }
+
+    parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
+    parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+        CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
+
+    DataType[] type = CarbonDataProcessorUtil
+        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+            parameters.getTableName());
+    parameters.setMeasureDataType(type);
+    setNoDictionarySortColumnMapping(parameters);
+    return parameters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
new file mode 100644
index 0000000..c4b0b31
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
+
+  /**
+   * temp file
+   */
+  private File tempFile;
+
+  /**
+   * read stream
+   */
+  private DataInputStream stream;
+
+  /**
+   * entry count
+   */
+  private int entryCount;
+
+  /**
+   * number record read
+   */
+  private int numberOfObjectRead;
+
+  /**
+   * return row
+   */
+  private Object[] returnRow;
+
+  /**
+   * number of measures
+   */
+  private int measureCount;
+
+  /**
+   * number of dimensionCount
+   */
+  private int dimensionCount;
+
+  /**
+   * number of complexDimensionCount
+   */
+  private int complexDimensionCount;
+
+  /**
+   * fileBufferSize for file reader stream size
+   */
+  private int fileBufferSize;
+
+  private Object[][] currentBuffer;
+
+  private Object[][] backupBuffer;
+
+  private boolean isBackupFilled;
+
+  private boolean prefetch;
+
+  private int bufferSize;
+
+  private int bufferRowCounter;
+
+  private ExecutorService executorService;
+
+  private Future<Void> submit;
+
+  private int prefetchRecordsProceesed;
+
+  /**
+   * sortTempFileNoOFRecordsInCompression
+   */
+  private int sortTempFileNoOFRecordsInCompression;
+
+  /**
+   * isSortTempFileCompressionEnabled
+   */
+  private boolean isSortTempFileCompressionEnabled;
+
+  /**
+   * totalRecordFetch
+   */
+  private int totalRecordFetch;
+
+  private int noDictionaryCount;
+
+  private DataType[] aggType;
+
+  /**
+   * to store whether dimension is of dictionary type or not
+   */
+  private boolean[] isNoDictionaryDimensionColumn;
+
+  /**
+   * to store whether sort column is of dictionary type or not
+   */
+  private boolean[] isNoDictionarySortColumn;
+
+  /**
+   * Constructor to initialize
+   *
+   * @param tempFile
+   * @param dimensionCount
+   * @param complexDimensionCount
+   * @param measureCount
+   * @param fileBufferSize
+   * @param noDictionaryCount
+   * @param aggType
+   * @param isNoDictionaryDimensionColumn
+   */
+  public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
+      int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
+      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
+    // set temp file
+    this.tempFile = tempFile;
+
+    // set measure and dimension count
+    this.measureCount = measureCount;
+    this.dimensionCount = dimensionCount;
+    this.complexDimensionCount = complexDimensionCount;
+
+    this.noDictionaryCount = noDictionaryCount;
+    // set mdkey length
+    this.fileBufferSize = fileBufferSize;
+    this.executorService = Executors.newFixedThreadPool(1);
+    this.aggType = aggType;
+
+    this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+  }
+
+  /**
+   * This method will be used to initialize
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while initializing
+   */
+  public void initialize() throws CarbonSortKeyAndGroupByException {
+    prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
+            CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
+    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
+    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
+    if (this.isSortTempFileCompressionEnabled) {
+      LOGGER.info("Compression was used while writing the sortTempFile");
+    }
+
+    try {
+      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (this.sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+            + " be used");
+
+        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed.Default value will be used");
+      this.sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+
+    initialise();
+  }
+
+  private void initialise() throws CarbonSortKeyAndGroupByException {
+    try {
+      if (isSortTempFileCompressionEnabled) {
+        this.bufferSize = sortTempFileNoOFRecordsInCompression;
+      }
+      stream = new DataInputStream(
+          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+      this.entryCount = stream.readInt();
+      if (prefetch) {
+        new DataFetcher(false).call();
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        if (isSortTempFileCompressionEnabled) {
+          new DataFetcher(false).call();
+        }
+      }
+
+    } catch (FileNotFoundException e) {
+      LOGGER.error(e);
+      throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+    }
+  }
+
+  /**
+   * This method will be used to read new row from file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while reading
+   */
+  public void readRow() throws CarbonSortKeyAndGroupByException {
+    if (prefetch) {
+      fillDataForPrefetch();
+    } else if (isSortTempFileCompressionEnabled) {
+      if (bufferRowCounter >= bufferSize) {
+        try {
+          new DataFetcher(false).call();
+          bufferRowCounter = 0;
+        } catch (Exception e) {
+          LOGGER.error(e);
+          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+        }
+
+      }
+      prefetchRecordsProceesed++;
+      returnRow = currentBuffer[bufferRowCounter++];
+    } else {
+      this.returnRow = getRowFromStream();
+    }
+  }
+
+  private void fillDataForPrefetch() {
+    if (bufferRowCounter >= bufferSize) {
+      if (isBackupFilled) {
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        totalRecordFetch += currentBuffer.length;
+        isBackupFilled = false;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        try {
+          submit.get();
+        } catch (Exception e) {
+          LOGGER.error(e);
+        }
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        isBackupFilled = false;
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      }
+    }
+    prefetchRecordsProceesed++;
+    returnRow = currentBuffer[bufferRowCounter++];
+  }
+
+  /**
+   * Reads row from file
+   * @return Object[]
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
+    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
+    Object[] measures = new Object[this.measureCount];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          short len = stream.readShort();
+          byte[] array = new byte[len];
+          stream.readFully(array);
+          nonDicArray[nonDicIndex++] = array;
+        } else {
+          dim[index++] = stream.readInt();
+        }
+      }
+
+      for (int i = 0; i < complexDimensionCount; i++) {
+        short len = stream.readShort();
+        byte[] array = new byte[len];
+        stream.readFully(array);
+        nonDicArray[nonDicIndex++] = array;
+      }
+
+      index = 0;
+      // read measure values
+      for (int i = 0; i < this.measureCount; i++) {
+        if (stream.readByte() == 1) {
+          switch (aggType[i]) {
+            case SHORT:
+              measures[index++] = stream.readShort();
+              break;
+            case INT:
+              measures[index++] = stream.readInt();
+              break;
+            case LONG:
+              measures[index++] = stream.readLong();
+              break;
+            case DOUBLE:
+              measures[index++] = stream.readDouble();
+              break;
+            case DECIMAL:
+              int len = stream.readInt();
+              byte[] buff = new byte[len];
+              stream.readFully(buff);
+              measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
+              break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
+          }
+        } else {
+          measures[index++] = null;
+        }
+      }
+
+      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+      this.numberOfObjectRead++;
+    } catch (IOException e) {
+      LOGGER.error("Problme while reading the madkey fom sort temp file");
+      throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
+    }
+
+    //return out row
+    return holder;
+  }
+
+  /**
+   * below method will be used to get the row
+   *
+   * @return row
+   */
+  public Object[] getRow() {
+    return this.returnRow;
+  }
+
+  /**
+   * below method will be used to check whether any more records are present
+   * in file or not
+   *
+   * @return more row present in file
+   */
+  public boolean hasNext() {
+    if (prefetch || isSortTempFileCompressionEnabled) {
+      return this.prefetchRecordsProceesed < this.entryCount;
+    }
+    return this.numberOfObjectRead < this.entryCount;
+  }
+
+  /**
+   * Below method will be used to close streams
+   */
+  public void closeStream() {
+    CarbonUtil.closeStreams(stream);
+    executorService.shutdown();
+    this.backupBuffer = null;
+    this.currentBuffer = null;
+  }
+
+  /**
+   * This method will number of entries
+   *
+   * @return entryCount
+   */
+  public int getEntryCount() {
+    return entryCount;
+  }
+
+  @Override public int compareTo(SortTempFileChunkHolder other) {
+    int diff = 0;
+    int index = 0;
+    int noDictionaryIndex = 0;
+    int[] leftMdkArray = (int[]) returnRow[0];
+    int[] rightMdkArray = (int[]) other.returnRow[0];
+    byte[][] leftNonDictArray = (byte[][]) returnRow[1];
+    byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
+    for (boolean isNoDictionary : isNoDictionarySortColumn) {
+      if (isNoDictionary) {
+        diff = UnsafeComparer.INSTANCE
+            .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
+        if (diff != 0) {
+          return diff;
+        }
+        noDictionaryIndex++;
+      } else {
+        diff = leftMdkArray[index] - rightMdkArray[index];
+        if (diff != 0) {
+          return diff;
+        }
+        index++;
+      }
+
+    }
+    return diff;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof SortTempFileChunkHolder)) {
+      return false;
+    }
+    SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
+
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    int hash = 0;
+    hash += 31 * measureCount;
+    hash += 31 * dimensionCount;
+    hash += 31 * complexDimensionCount;
+    hash += 31 * noDictionaryCount;
+    hash += tempFile.hashCode();
+    return hash;
+  }
+
+  private final class DataFetcher implements Callable<Void> {
+    private boolean isBackUpFilling;
+
+    private int numberOfRecords;
+
+    private DataFetcher(boolean backUp) {
+      isBackUpFilling = backUp;
+      calculateNumberOfRecordsToBeFetched();
+    }
+
+    private void calculateNumberOfRecordsToBeFetched() {
+      int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
+      numberOfRecords =
+          bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
+    }
+
+    @Override public Void call() throws Exception {
+      try {
+        if (isBackUpFilling) {
+          backupBuffer = prefetchRecordsFromFile(numberOfRecords);
+          isBackupFilled = true;
+        } else {
+          currentBuffer = prefetchRecordsFromFile(numberOfRecords);
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+      }
+      return null;
+    }
+
+  }
+
+  /**
+   * This method will read the records from sort temp file and keep it in a buffer
+   *
+   * @param numberOfRecords
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+      throws CarbonSortKeyAndGroupByException {
+    Object[][] records = new Object[numberOfRecords][];
+    for (int i = 0; i < numberOfRecords; i++) {
+      records[i] = getRowFromStream();
+    }
+    return records;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
new file mode 100644
index 0000000..025aef8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class SortTempFileChunkWriter implements TempSortFileWriter {
+  /**
+   * writer
+   */
+  private TempSortFileWriter writer;
+
+  /**
+   * recordPerLeaf
+   */
+  private int recordPerLeaf;
+
+  /**
+   * CarbonCompressedSortTempFileChunkWriter
+   *
+   * @param writer
+   */
+  public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) {
+    this.writer = writer;
+    this.recordPerLeaf = recordPerLeaf;
+  }
+
+  /**
+   * initialize
+   */
+  public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException {
+    this.writer.initiaize(file, entryCount);
+  }
+
+  /**
+   * finish
+   */
+  public void finish() {
+    this.writer.finish();
+  }
+
+  /**
+   * Below method will be used to write the sort temp file chunk by chunk
+   */
+  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+    int recordCount = 0;
+    Object[][] tempRecords;
+    while (recordCount < records.length) {
+      if (records.length - recordCount < recordPerLeaf) {
+        recordPerLeaf = records.length - recordCount;
+      }
+      tempRecords = new Object[recordPerLeaf][];
+      System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf);
+      recordCount += recordPerLeaf;
+      this.writer.writeSortTempFile(tempRecords);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
new file mode 100644
index 0000000..0de9af7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+public interface TempSortFileReader {
+  /**
+   * below method will be used to close the file holder
+   */
+  void finish();
+
+  /**
+   * Below method will be used to get the row
+   */
+  Object[][] getRow();
+
+  /**
+   * Below method will be used to get the total row count in temp file
+   *
+   * @return
+   */
+  int getEntryCount();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
new file mode 100644
index 0000000..4e4a8e7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public interface TempSortFileWriter {
+  /**
+   * Method will be used to initialize
+   *
+   * @param file
+   * @param entryCount
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException;
+
+  /**
+   * Method will be used to finish
+   */
+  void finish();
+
+  /**
+   * Below method will be used to write the sort temp file
+   *
+   * @param records
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
new file mode 100644
index 0000000..259ab9f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+public final class TempSortFileWriterFactory {
+  private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory();
+
+  private TempSortFileWriterFactory() {
+
+  }
+
+  public static TempSortFileWriterFactory getInstance() {
+    return WRITERFACTORY;
+  }
+
+  public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount,
+      int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize) {
+    if (isCompressionEnabled) {
+      return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
+          noDictionaryCount, writeBufferSize);
+    } else {
+      return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
+          noDictionaryCount, writeBufferSize);
+    }
+  }
+}