You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/13 11:14:57 UTC

[2/4] incubator-carbondata git commit: Added unsafe on-heap/off-heap sort to improve loading performance

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
new file mode 100644
index 0000000..d512349
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
+
+public class UnsafeInmemoryHolder implements SortTempChunkHolder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName());
+
+  private int counter;
+
+  private int actualSize;
+
+  private UnsafeCarbonRowPage rowPage;
+
+  private Object[] currentRow;
+
+  private long address;
+
+  private NewRowComparator comparator;
+
+  private int columnSize;
+
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) {
+    this.actualSize = rowPage.getBuffer().getActualSize();
+    this.rowPage = rowPage;
+    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+    this.comparator = new NewRowComparator(rowPage.getNoDictionaryDimensionMapping());
+    this.columnSize = columnSize;
+  }
+
+  public boolean hasNext() {
+    if (counter < actualSize) {
+      return true;
+    }
+    return false;
+  }
+
+  public void readRow() {
+    currentRow = new Object[columnSize];
+    address = rowPage.getBuffer().get(counter);
+    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
+    counter++;
+  }
+
+  public Object[] getRow() {
+    return currentRow;
+  }
+
+  @Override public int compareTo(SortTempChunkHolder o) {
+    return comparator.compare(currentRow, o.getRow());
+  }
+
+  public int numberOfRows() {
+    return actualSize;
+  }
+
+  public void close() {
+    rowPage.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
new file mode 100644
index 0000000..9f157a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
+
+/**
+ * It is used for merging unsafe inmemory intermediate data
+ */
+public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
+
+  private int counter;
+
+  private int actualSize;
+
+  private UnsafeCarbonRowPage rowPage;
+
+  private UnsafeCarbonRowForMerge currentRow;
+
+  private long address;
+
+  private UnsafeRowComparator comparator;
+
+  private Object baseObject;
+
+  public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) {
+    this.actualSize = rowPage.getBuffer().getActualSize();
+    this.rowPage = rowPage;
+    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+    this.comparator = new UnsafeRowComparator(rowPage);
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    currentRow = new UnsafeCarbonRowForMerge();
+    currentRow.index = index;
+  }
+
+  public boolean hasNext() {
+    if (counter < actualSize) {
+      return true;
+    }
+    return false;
+  }
+
+  public void readRow() {
+    address = rowPage.getBuffer().get(counter);
+    currentRow.address = address + rowPage.getDataBlock().getBaseOffset();
+    counter++;
+  }
+
+  public UnsafeCarbonRowForMerge getRow() {
+    return currentRow;
+  }
+
+  @Override public int compareTo(UnsafeInmemoryMergeHolder o) {
+    return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
+  }
+
+  public int numberOfRows() {
+    return actualSize;
+  }
+
+  public Object getBaseObject() {
+    return baseObject;
+  }
+
+  public void close() {
+    rowPage.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
new file mode 100644
index 0000000..30ef9ee
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+
+public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName());
+
+  /**
+   * temp file
+   */
+  private File tempFile;
+
+  /**
+   * read stream
+   */
+  private DataInputStream stream;
+
+  /**
+   * entry count
+   */
+  private int entryCount;
+
+  /**
+   * return row
+   */
+  private Object[] returnRow;
+
+  /**
+   * number of measures
+   */
+  private int measureCount;
+
+  /**
+   * number of dimensionCount
+   */
+  private int dimensionCount;
+
+  /**
+   * number of complexDimensionCount
+   */
+  private int complexDimensionCount;
+
+  /**
+   * fileBufferSize for file reader stream size
+   */
+  private int fileBufferSize;
+
+  private Object[][] currentBuffer;
+
+  private Object[][] backupBuffer;
+
+  private boolean isBackupFilled;
+
+  private boolean prefetch;
+
+  private int bufferSize;
+
+  private int bufferRowCounter;
+
+  private ExecutorService executorService;
+
+  private Future<Void> submit;
+
+  private int prefetchRecordsProceesed;
+
+  /**
+   * sortTempFileNoOFRecordsInCompression
+   */
+  private int sortTempFileNoOFRecordsInCompression;
+
+  /**
+   * isSortTempFileCompressionEnabled
+   */
+  private boolean isSortTempFileCompressionEnabled;
+
+  /**
+   * totalRecordFetch
+   */
+  private int totalRecordFetch;
+
+  private int noDictionaryCount;
+
+  private char[] aggType;
+
+  private int numberOfObjectRead;
+  /**
+   * to store whether dimension is of dictionary type or not
+   */
+  private boolean[] isNoDictionaryDimensionColumn;
+
+  private int nullSetWordsLength;
+
+  private Comparator<Object[]> comparator;
+
+  /**
+   * Constructor to initialize
+   */
+  public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
+    // set temp file
+    this.tempFile = tempFile;
+
+    // set measure and dimension count
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    // set mdkey length
+    this.fileBufferSize = parameters.getFileBufferSize();
+    this.executorService = Executors.newFixedThreadPool(1);
+    this.aggType = parameters.getAggType();
+    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+    this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
+    comparator = new NewRowComparator(isNoDictionaryDimensionColumn);
+    initialize();
+  }
+
+  /**
+   * This method will be used to initialize
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while initializing
+   */
+  public void initialize() {
+    prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
+            CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
+    bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
+    if (this.isSortTempFileCompressionEnabled) {
+      LOGGER.info("Compression was used while writing the sortTempFile");
+    }
+
+    try {
+      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (this.sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+            + " be used");
+
+        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed.Default value will be used");
+      this.sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+
+    initialise();
+  }
+
+  private void initialise() {
+    try {
+      if (isSortTempFileCompressionEnabled) {
+        this.bufferSize = sortTempFileNoOFRecordsInCompression;
+      }
+      stream = new DataInputStream(
+          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+      this.entryCount = stream.readInt();
+      LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
+      if (prefetch) {
+        new DataFetcher(false).call();
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        if (isSortTempFileCompressionEnabled) {
+          new DataFetcher(false).call();
+        }
+      }
+
+    } catch (FileNotFoundException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " No Found", e);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " No Found", e);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " Problem while reading", e);
+    }
+  }
+
+  /**
+   * This method will be used to read new row from file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while reading
+   */
+  public void readRow() throws CarbonSortKeyAndGroupByException {
+    if (prefetch) {
+      fillDataForPrefetch();
+    } else if (isSortTempFileCompressionEnabled) {
+      if (bufferRowCounter >= bufferSize) {
+        try {
+          new DataFetcher(false).call();
+          bufferRowCounter = 0;
+        } catch (Exception e) {
+          LOGGER.error(e);
+          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+        }
+
+      }
+      prefetchRecordsProceesed++;
+      returnRow = currentBuffer[bufferRowCounter++];
+    } else {
+      Object[] outRow = getRowFromStream();
+      this.returnRow = outRow;
+    }
+  }
+
+  private void fillDataForPrefetch() {
+    if (bufferRowCounter >= bufferSize) {
+      if (isBackupFilled) {
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        totalRecordFetch += currentBuffer.length;
+        isBackupFilled = false;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        try {
+          submit.get();
+        } catch (Exception e) {
+          LOGGER.error(e);
+        }
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        isBackupFilled = false;
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      }
+    }
+    prefetchRecordsProceesed++;
+    returnRow = currentBuffer[bufferRowCounter++];
+  }
+
+  /**
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+    Object[] row = new Object[dimensionCount + measureCount];
+    try {
+      int dimCount = 0;
+      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+        if (isNoDictionaryDimensionColumn[dimCount]) {
+          short aShort = stream.readShort();
+          byte[] col = new byte[aShort];
+          stream.readFully(col);
+          row[dimCount] = col;
+        } else {
+          int anInt = stream.readInt();
+          row[dimCount] = anInt;
+        }
+      }
+
+      // write complex dimensions here.
+      for (; dimCount < dimensionCount; dimCount++) {
+        short aShort = stream.readShort();
+        byte[] col = new byte[aShort];
+        stream.readFully(col);
+        row[dimCount] = col;
+      }
+
+      long[] words = new long[nullSetWordsLength];
+      for (int i = 0; i < words.length; i++) {
+        words[i] = stream.readLong();
+      }
+
+      for (int mesCount = 0; mesCount < measureCount; mesCount++) {
+        if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
+          if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+            row[dimensionCount + mesCount] = stream.readDouble();
+          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+            row[dimensionCount + mesCount] = stream.readLong();
+          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+            short aShort = stream.readShort();
+            byte[] bigDecimalInBytes = new byte[aShort];
+            stream.readFully(bigDecimalInBytes);
+            row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+          }
+        }
+      }
+      return row;
+    } catch (Exception e) {
+      throw new CarbonSortKeyAndGroupByException(e);
+    }
+  }
+
+  /**
+   * below method will be used to get the row
+   *
+   * @return row
+   */
+  public Object[] getRow() {
+    return this.returnRow;
+  }
+
+  /**
+   * below method will be used to check whether any more records are present
+   * in file or not
+   *
+   * @return more row present in file
+   */
+  public boolean hasNext() {
+    if (prefetch || isSortTempFileCompressionEnabled) {
+      return this.prefetchRecordsProceesed < this.entryCount;
+    }
+    return this.numberOfObjectRead < this.entryCount;
+  }
+
+  /**
+   * Below method will be used to close streams
+   */
+  public void close() {
+    CarbonUtil.closeStreams(stream);
+    executorService.shutdown();
+  }
+
+  /**
+   * This method will number of entries
+   *
+   * @return entryCount
+   */
+  public int numberOfRows() {
+    return entryCount;
+  }
+
+  @Override public int compareTo(SortTempChunkHolder other) {
+    return comparator.compare(returnRow, other.getRow());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
+      return false;
+    }
+    UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
+
+    return o.compareTo(o) == 0;
+  }
+
+  @Override public int hashCode() {
+    int hash = 0;
+    hash += 31 * measureCount;
+    hash += 31 * dimensionCount;
+    hash += 31 * complexDimensionCount;
+    hash += 31 * noDictionaryCount;
+    hash += tempFile.hashCode();
+    return hash;
+  }
+
+  private final class DataFetcher implements Callable<Void> {
+    private boolean isBackUpFilling;
+
+    private int numberOfRecords;
+
+    private DataFetcher(boolean backUp) {
+      isBackUpFilling = backUp;
+      calculateNumberOfRecordsToBeFetched();
+    }
+
+    private void calculateNumberOfRecordsToBeFetched() {
+      int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
+      numberOfRecords =
+          bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
+    }
+
+    @Override public Void call() throws Exception {
+      try {
+        if (isBackUpFilling) {
+          backupBuffer = prefetchRecordsFromFile(numberOfRecords);
+          isBackupFilled = true;
+        } else {
+          currentBuffer = prefetchRecordsFromFile(numberOfRecords);
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+      }
+      return null;
+    }
+
+  }
+
+  /**
+   * This method will read the records from sort temp file and keep it in a buffer
+   *
+   * @param numberOfRecords
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+      throws CarbonSortKeyAndGroupByException {
+    Object[][] records = new Object[numberOfRecords][];
+    for (int i = 0; i < numberOfRecords; i++) {
+      records[i] = getRowFromStream();
+    }
+    return records;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
new file mode 100644
index 0000000..0d36d90
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
+
+import java.util.AbstractQueue;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRowForMerge;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+
+public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
+
+  /**
+   * fileCounter
+   */
+  private int holderCounter;
+
+  /**
+   * entryCount
+   */
+  private int entryCount;
+
+  private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
+
+  private long[] mergedAddresses;
+
+  private byte[] rowPageIndexes;
+
+  /**
+   * IntermediateFileMerger Constructor
+   */
+  public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
+      int totalSize) {
+    this.holderCounter = unsafeCarbonRowPages.length;
+    this.unsafeCarbonRowPages = unsafeCarbonRowPages;
+    this.mergedAddresses = new long[totalSize];
+    this.rowPageIndexes = new byte[totalSize];
+    this.entryCount = 0;
+  }
+
+  @Override public Void call() throws Exception {
+    long intermediateMergeStartTime = System.currentTimeMillis();
+    int holderCounterConst = holderCounter;
+    boolean isFailed = false;
+    try {
+      startSorting();
+      while (hasNext()) {
+        writeDataToMemory(next());
+      }
+      double intermediateMergeCostTime =
+          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+      LOGGER.info("============================== Intermediate Merge of " + holderCounterConst
+          + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while intermediate merging");
+    }
+    return null;
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private UnsafeCarbonRowForMerge getSortedRecordFromMemory()
+      throws CarbonSortKeyAndGroupByException {
+    UnsafeCarbonRowForMerge row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // change the file counter
+      --this.holderCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    poll.readRow();
+
+    // add to heap
+    this.recordHolderHeap.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startSorting() throws CarbonSortKeyAndGroupByException {
+    LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
+
+    // create record holder heap
+    createRecordHolderQueue(unsafeCarbonRowPages);
+
+    // iterate over file list and create chunk holder and add to heap
+    LOGGER.info("Started adding first record from row page");
+
+    UnsafeInmemoryMergeHolder unsafePageHolder = null;
+    byte index = 0;
+    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
+      // create chunk holder
+      unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
+
+      // initialize
+      unsafePageHolder.readRow();
+
+      // add to heap
+      this.recordHolderHeap.add(unsafePageHolder);
+    }
+
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   */
+  private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
+    // creating record holder heap
+    this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
+    return getSortedRecordFromMemory();
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  private boolean hasNext() {
+    return this.holderCounter > 0;
+  }
+
+  /**
+   * Below method will be used to write data to file
+   */
+  private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
+    mergedAddresses[entryCount] = row.address;
+    rowPageIndexes[entryCount] = row.index;
+    entryCount++;
+  }
+
+  public int getEntryCount() {
+    return entryCount;
+  }
+
+  public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
+    return unsafeCarbonRowPages;
+  }
+
+  public long[] getMergedAddresses() {
+    return mergedAddresses;
+  }
+
+  public byte[] getRowPageIndexes() {
+    return rowPageIndexes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
new file mode 100644
index 0000000..735243e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.AbstractQueue;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory;
+
+public class UnsafeIntermediateFileMerger implements Callable<Void> {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeIntermediateFileMerger.class.getName());
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<SortTempChunkHolder> recordHolderHeap;
+
+  /**
+   * fileCounter
+   */
+  private int fileCounter;
+
+  /**
+   * stream
+   */
+  private DataOutputStream stream;
+
+  /**
+   * totalNumberOfRecords
+   */
+  private int totalNumberOfRecords;
+
+  /**
+   * writer
+   */
+  private TempSortFileWriter writer;
+
+  private SortParameters mergerParameters;
+
+  private File[] intermediateFiles;
+
+  private File outPutFile;
+
+  private boolean[] noDictionarycolumnMapping;
+
+  private long[] nullSetWords;
+
+  private ByteBuffer rowData;
+
+  /**
+   * IntermediateFileMerger Constructor
+   */
+  public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
+      File outPutFile) {
+    this.mergerParameters = mergerParameters;
+    this.fileCounter = intermediateFiles.length;
+    this.intermediateFiles = intermediateFiles;
+    this.outPutFile = outPutFile;
+    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) >> 6) + 1];
+    // Take size of 2 MB for each row. I think it is high enough to use
+    rowData = ByteBuffer.allocate(2*1024*1024);
+  }
+
+  @Override public Void call() throws Exception {
+    long intermediateMergeStartTime = System.currentTimeMillis();
+    int fileConterConst = fileCounter;
+    boolean isFailed = false;
+    try {
+      startSorting();
+      initialize();
+      while (hasNext()) {
+        writeDataTofile(next());
+      }
+      double intermediateMergeCostTime =
+          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+      LOGGER.info("============================== Intermediate Merge of " + fileConterConst
+          + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while intermediate merging");
+      isFailed = true;
+    } finally {
+      CarbonUtil.closeStreams(this.stream);
+      if (null != writer) {
+        writer.finish();
+      }
+      if (!isFailed) {
+        try {
+          finish();
+        } catch (CarbonSortKeyAndGroupByException e) {
+          LOGGER.error(e, "Problem while deleting the merge file");
+        }
+      } else {
+        if (outPutFile.delete()) {
+          LOGGER.error("Problem while deleting the merge file");
+        }
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * This method is responsible for initializing the out stream
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void initialize() throws CarbonSortKeyAndGroupByException {
+    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
+      try {
+        this.stream = new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(outPutFile),
+                mergerParameters.getFileWriteBufferSize()));
+        this.stream.writeInt(this.totalNumberOfRecords);
+      } catch (FileNotFoundException e) {
+        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
+      } catch (IOException e) {
+        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
+      }
+    } else {
+      writer = TempSortFileWriterFactory.getInstance()
+          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
+              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
+              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
+              mergerParameters.getFileWriteBufferSize());
+      writer.initiaize(outPutFile, totalNumberOfRecords);
+    }
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+    Object[] row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    SortTempChunkHolder poll = this.recordHolderHeap.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // if chunk is empty then close the stream
+      poll.close();
+
+      // change the file counter
+      --this.fileCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    poll.readRow();
+
+    // add to heap
+    this.recordHolderHeap.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startSorting() throws CarbonSortKeyAndGroupByException {
+    LOGGER.info("Number of temp file: " + this.fileCounter);
+
+    // create record holder heap
+    createRecordHolderQueue(intermediateFiles);
+
+    // iterate over file list and create chunk holder and add to heap
+    LOGGER.info("Started adding first record from each file");
+
+    SortTempChunkHolder sortTempFileChunkHolder = null;
+
+    for (File tempFile : intermediateFiles) {
+      // create chunk holder
+      sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters);
+
+      sortTempFileChunkHolder.readRow();
+      this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
+
+      // add to heap
+      this.recordHolderHeap.add(sortTempFileChunkHolder);
+    }
+
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   *
+   * @param listFiles list of temp files
+   */
+  private void createRecordHolderQueue(File[] listFiles) {
+    // creating record holder heap
+    this.recordHolderHeap = new PriorityQueue<SortTempChunkHolder>(listFiles.length);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] next() throws CarbonSortKeyAndGroupByException {
+    return getSortedRecordFromFile();
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  private boolean hasNext() {
+    return this.fileCounter > 0;
+  }
+
+  /**
+   * Below method will be used to write data to file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
+    int dimCount = 0;
+    int size = 0;
+    char[] aggType = mergerParameters.getAggType();
+    for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
+      if (noDictionarycolumnMapping[dimCount]) {
+        byte[] col = (byte[]) row[dimCount];
+        rowData.putShort((short) col.length);
+        size += 2;
+        rowData.put(col);
+        size += col.length;
+      } else {
+        rowData.putInt((int) row[dimCount]);
+        size += 4;
+      }
+    }
+
+    // write complex dimensions here.
+    int dimensionSize = mergerParameters.getDimColCount();
+    int measureSize = mergerParameters.getMeasureColCount();
+    for (; dimCount < dimensionSize; dimCount++) {
+      byte[] col = (byte[]) row[dimCount];
+      rowData.putShort((short)col.length);
+      size += 2;
+      rowData.put(col);
+      size += col.length;
+    }
+    Arrays.fill(nullSetWords, 0);
+    int nullSetSize = nullSetWords.length * 8;
+    int nullLoc = size;
+    size += nullSetSize;
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      Object value = row[mesCount + dimensionSize];
+      if (null != value) {
+        if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+          Double val = (Double) value;
+          rowData.putDouble(size, val);
+          size += 8;
+        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+          Long val = (Long) value;
+          rowData.putLong(size, val);
+          size += 8;
+        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+          BigDecimal val = (BigDecimal) value;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+          rowData.putShort(size, (short)bigDecimalInBytes.length);
+          size += 2;
+          for (int i = 0; i < bigDecimalInBytes.length; i++) {
+            rowData.put(size++, bigDecimalInBytes[i]);
+          }
+        }
+        UnsafeCarbonRowPage.set(nullSetWords, mesCount);
+      } else {
+        UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
+      }
+    }
+    for (int i = 0; i < nullSetWords.length; i++) {
+      rowData.putLong(nullLoc, nullSetWords[i]);
+      nullLoc += 8;
+    }
+    byte[] rowBytes = new byte[size];
+    rowData.position(0);
+    rowData.get(rowBytes);
+    stream.write(rowBytes);
+    rowData.clear();
+  }
+
+  private void finish() throws CarbonSortKeyAndGroupByException {
+    if (recordHolderHeap != null) {
+      int size = recordHolderHeap.size();
+      for (int i = 0; i < size; i++) {
+        recordHolderHeap.poll().close();
+      }
+    }
+    try {
+      CarbonUtil.deleteFiles(intermediateFiles);
+      rowData.clear();
+    } catch (CarbonUtilException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
new file mode 100644
index 0000000..1cb2336
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+
+/**
+ * It does mergesort intermediate files to big file.
+ */
+public class UnsafeIntermediateMerger {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeIntermediateMerger.class.getName());
+
+  /**
+   * executorService
+   */
+  private ExecutorService executorService;
+  /**
+   * rowPages
+   */
+  private List<UnsafeCarbonRowPage> rowPages;
+
+  private List<UnsafeInMemoryIntermediateDataMerger> mergedPages;
+
+  private SortParameters parameters;
+
+  private final Object lockObject = new Object();
+
+  private boolean offHeap;
+
+  private List<File> procFiles;
+
+  public UnsafeIntermediateMerger(SortParameters parameters) {
+    this.parameters = parameters;
+    // processed file list
+    this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    this.mergedPages = new ArrayList<>();
+    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+    this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+  }
+
+  public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
+    // add sort temp filename to and arrayList. When the list size reaches 20 then
+    // intermediate merging of sort temp files will be triggered
+    synchronized (lockObject) {
+      rowPages.add(rowPage);
+    }
+  }
+
+  public void addFileToMerge(File sortTempFile) {
+    // add sort temp filename to and arrayList. When the list size reaches 20 then
+    // intermediate merging of sort temp files will be triggered
+    synchronized (lockObject) {
+      procFiles.add(sortTempFile);
+    }
+  }
+
+  public void startFileMergingIfPossible() {
+    File[] fileList;
+    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+      synchronized (lockObject) {
+        fileList = procFiles.toArray(new File[procFiles.size()]);
+        this.procFiles = new ArrayList<File>();
+      }
+      LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
+      startIntermediateMerging(fileList);
+    }
+  }
+
+  /**
+   * Below method will be used to start the intermediate file merging
+   *
+   * @param intermediateFiles
+   */
+  private void startIntermediateMerging(File[] intermediateFiles) {
+    File file = new File(
+        parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System
+            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+    UnsafeIntermediateFileMerger merger =
+        new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
+    executorService.submit(merger);
+  }
+
+  public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
+    UnsafeCarbonRowPage[] localRowPages;
+    if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+      int totalRows = 0;
+      synchronized (lockObject) {
+        totalRows = getTotalNumberOfRows(rowPages);
+        if (totalRows <= 0) {
+          return;
+        }
+        localRowPages = rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]);
+        this.rowPages = new ArrayList<>();
+      }
+      LOGGER.debug("Sumitting request for intermediate merging of in-memory pages : "
+          + localRowPages.length);
+      startIntermediateMerging(localRowPages, totalRows);
+    }
+  }
+
+  /**
+   * Below method will be used to start the intermediate file merging
+   *
+   * @param rowPages
+   */
+  private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows)
+      throws CarbonSortKeyAndGroupByException {
+    UnsafeInMemoryIntermediateDataMerger merger =
+        new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows);
+    mergedPages.add(merger);
+    executorService.submit(merger);
+  }
+
+  private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) {
+    int totalSize = 0;
+    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
+      totalSize += unsafeCarbonRowPage.getBuffer().getActualSize();
+    }
+    return totalSize;
+  }
+
+  public void finish() throws CarbonSortKeyAndGroupByException {
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+    }
+  }
+
+  public void close() {
+    if (executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
+    rowPages.clear();
+    rowPages = null;
+  }
+
+  public List<UnsafeCarbonRowPage> getRowPages() {
+    return rowPages;
+  }
+
+  public List<UnsafeInMemoryIntermediateDataMerger> getMergedPages() {
+    return mergedPages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
new file mode 100644
index 0000000..a142823
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.AbstractQueue;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeFinalMergePageHolder;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+
+public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeSingleThreadFinalSortFilesMerger.class.getName());
+
+  /**
+   * lockObject
+   */
+  private static final Object LOCKOBJECT = new Object();
+
+  /**
+   * fileCounter
+   */
+  private int fileCounter;
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
+
+  private SortParameters parameters;
+
+  /**
+   * number of measures
+   */
+  private int measureCount;
+
+  /**
+   * number of dimensionCount
+   */
+  private int dimensionCount;
+
+  /**
+   * number of complexDimensionCount
+   */
+  private int noDictionaryCount;
+
+  private int complexDimensionCount;
+
+  private boolean[] isNoDictionaryDimensionColumn;
+
+  /**
+   * tempFileLocation
+   */
+  private String tempFileLocation;
+
+  private String tableName;
+
+  public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) {
+    this.parameters = parameters;
+    // set measure and dimension count
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+    this.tempFileLocation = parameters.getTempFileLocation();
+    this.tableName = parameters.getTableName();
+  }
+
+  /**
+   * This method will be used to merger the merged files
+   *
+   */
+  public void startFinalMerge(UnsafeCarbonRowPage[] rowPages,
+      List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
+    startSorting(rowPages, merges);
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   */
+  private void startSorting(UnsafeCarbonRowPage[] rowPages,
+      List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
+    try {
+      File[] filesToMergeSort = getFilesToMergeSort();
+      this.fileCounter = rowPages.length + filesToMergeSort.length + merges.size();
+
+      LOGGER.info("Number of row pages: " + this.fileCounter);
+
+      // create record holder heap
+      createRecordHolderQueue();
+
+      // iterate over file list and create chunk holder and add to heap
+      LOGGER.info("Started adding first record from each page");
+      for (final UnsafeCarbonRowPage rowPage : rowPages) {
+
+        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
+            parameters.getDimColCount() + parameters.getMeasureColCount());
+
+        // initialize
+        sortTempFileChunkHolder.readRow();
+
+        recordHolderHeapLocal.add(sortTempFileChunkHolder);
+      }
+
+      for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
+
+        SortTempChunkHolder sortTempFileChunkHolder =
+            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(),
+                parameters.getDimColCount() + parameters.getMeasureColCount());
+
+        // initialize
+        sortTempFileChunkHolder.readRow();
+
+        recordHolderHeapLocal.add(sortTempFileChunkHolder);
+      }
+
+      for (final File file : filesToMergeSort) {
+
+        SortTempChunkHolder sortTempFileChunkHolder =
+            new UnsafeSortTempFileChunkHolder(file, parameters);
+
+        // initialize
+        sortTempFileChunkHolder.readRow();
+
+        recordHolderHeapLocal.add(sortTempFileChunkHolder);
+      }
+
+      LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new CarbonDataWriterException(e.getMessage());
+    }
+  }
+
+  private File[] getFilesToMergeSort() {
+    // get all the merged files
+    File file = new File(tempFileLocation);
+
+    File[] fileList = file.listFiles(new FileFilter() {
+      public boolean accept(File pathname) {
+        return pathname.getName().startsWith(tableName);
+      }
+    });
+
+    if (null == fileList || fileList.length < 0) {
+      return new File[0];
+    }
+    return fileList;
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   */
+  private void createRecordHolderQueue() {
+    // creating record holder heap
+    this.recordHolderHeapLocal = new PriorityQueue<SortTempChunkHolder>(fileCounter);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   */
+  public Object[] next() {
+    return convertRow(getSortedRecordFromFile());
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   */
+  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+    Object[] row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    SortTempChunkHolder poll = this.recordHolderHeapLocal.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // if chunk is empty then close the stream
+      poll.close();
+
+      // change the file counter
+      --this.fileCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    try {
+      poll.readRow();
+    } catch (Exception e) {
+      throw new CarbonDataWriterException(e.getMessage(), e);
+    }
+
+    // add to heap
+    this.recordHolderHeapLocal.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  public boolean hasNext() {
+    return this.fileCounter > 0;
+  }
+
+  private Object[] convertRow(Object[] data) {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int allCount = 0;
+    int[] dim = new int[this.dimensionCount];
+    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
+    Object[] measures = new Object[this.measureCount];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          nonDicArray[nonDicIndex++] = (byte[]) data[i];
+        } else {
+          dim[index++] = (int) data[allCount];
+        }
+        allCount++;
+      }
+
+      for (int i = 0; i < complexDimensionCount; i++) {
+        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
+        allCount++;
+      }
+
+      index = 0;
+      // read measure values
+      for (int i = 0; i < this.measureCount; i++) {
+        measures[index++] = data[allCount];
+        allCount++;
+      }
+
+      RemoveDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row ", e);
+    }
+
+    //return out row
+    return holder;
+  }
+
+  public void clear() {
+    if (null != recordHolderHeapLocal) {
+      for (SortTempChunkHolder pageHolder : recordHolderHeapLocal) {
+        pageHolder.close();
+      }
+      recordHolderHeapLocal = null;
+    }
+  }
+}