You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:22 UTC

[35/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
new file mode 100644
index 0000000..14ab838
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
+ */
+public class UnsafeCarbonRowPage {
+
+  private boolean[] noDictionaryDimensionMapping;
+
+  private boolean[] noDictionarySortColumnMapping;
+
+  private int dimensionSize;
+
+  private int measureSize;
+
+  private DataType[] measureDataType;
+
+  private long[] nullSetWords;
+
+  private IntPointerBuffer buffer;
+
+  private int lastSize;
+
+  private long sizeToBeUsed;
+
+  private MemoryBlock dataBlock;
+
+  private boolean saveToDisk;
+
+  private MemoryManagerType managerType;
+
+  private long taskId;
+
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
+      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+    this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+    this.dimensionSize = dimensionSize;
+    this.measureSize = measureSize;
+    this.measureDataType = type;
+    this.saveToDisk = saveToDisk;
+    this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
+    this.taskId = taskId;
+    buffer = new IntPointerBuffer(this.taskId);
+    this.dataBlock = memoryBlock;
+    // TODO Only using 98% of space for safe side.May be we can have different logic.
+    sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
+    this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
+  }
+
+  public int addRow(Object[] row) {
+    int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
+    buffer.set(lastSize);
+    lastSize = lastSize + size;
+    return size;
+  }
+
+  private int addRow(Object[] row, long address) {
+    if (row == null) {
+      throw new RuntimeException("Row is null ??");
+    }
+    int dimCount = 0;
+    int size = 0;
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        byte[] col = (byte[]) row[dimCount];
+        CarbonUnsafe.getUnsafe()
+            .putShort(baseObject, address + size, (short) col.length);
+        size += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+            address + size, col.length);
+        size += col.length;
+      } else {
+        int value = (int) row[dimCount];
+        CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+        size += 4;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      byte[] col = (byte[]) row[dimCount];
+      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
+      size += 2;
+      CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+          address + size, col.length);
+      size += col.length;
+    }
+    Arrays.fill(nullSetWords, 0);
+    int nullSetSize = nullSetWords.length * 8;
+    int nullWordLoc = size;
+    size += nullSetSize;
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      Object value = row[mesCount + dimensionSize];
+      if (null != value) {
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+            Short sval = (Short) value;
+            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+            size += 2;
+            break;
+          case INT:
+            Integer ival = (Integer) value;
+            CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+            size += 4;
+            break;
+          case LONG:
+            Long val = (Long) value;
+            CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+            size += 8;
+            break;
+          case DOUBLE:
+            Double doubleVal = (Double) value;
+            CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+            size += 8;
+            break;
+          case DECIMAL:
+            BigDecimal decimalVal = (BigDecimal) value;
+            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size,
+                (short) bigDecimalInBytes.length);
+            size += 2;
+            CarbonUnsafe.getUnsafe()
+                .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+                    address + size, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            break;
+          default:
+            throw  new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
+        }
+        set(nullSetWords, mesCount);
+      } else {
+        unset(nullSetWords, mesCount);
+      }
+    }
+    CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+        address + nullWordLoc, nullSetSize);
+    return size;
+  }
+
+  public Object[] getRow(long address, Object[] rowToFill) {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        rowToFill[dimCount] = col;
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+        size += 4;
+        rowToFill[dimCount] = anInt;
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      rowToFill[dimCount] = col;
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+            Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+            size += 2;
+            rowToFill[dimensionSize + mesCount] = sval;
+            break;
+          case INT:
+            Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+            size += 4;
+            rowToFill[dimensionSize + mesCount] = ival;
+            break;
+          case LONG:
+            Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = val;
+            break;
+          case DOUBLE:
+            Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = doubleVal;
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+            break;
+          default:
+            throw new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
+        }
+      } else {
+        rowToFill[dimensionSize + mesCount] = null;
+      }
+    }
+    return rowToFill;
+  }
+
+  public void fillRow(long address, DataOutputStream stream) throws IOException {
+    int dimCount = 0;
+    int size = 0;
+
+    Object baseObject = dataBlock.getBaseObject();
+    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+      if (noDictionaryDimensionMapping[dimCount]) {
+        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+        byte[] col = new byte[aShort];
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                col.length);
+        size += col.length;
+        stream.writeShort(aShort);
+        stream.write(col);
+      } else {
+        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+        size += 4;
+        stream.writeInt(anInt);
+      }
+    }
+
+    // write complex dimensions here.
+    for (; dimCount < dimensionSize; dimCount++) {
+      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      byte[] col = new byte[aShort];
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+      size += col.length;
+      stream.writeShort(aShort);
+      stream.write(col);
+    }
+
+    int nullSetSize = nullSetWords.length * 8;
+    Arrays.fill(nullSetWords, 0);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+            nullSetSize);
+    size += nullSetSize;
+    for (int i = 0; i < nullSetWords.length; i++) {
+      stream.writeLong(nullSetWords[i]);
+    }
+
+    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+      if (isSet(nullSetWords, mesCount)) {
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+            short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+            size += 2;
+            stream.writeShort(sval);
+            break;
+          case INT:
+            int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+            size += 4;
+            stream.writeInt(ival);
+            break;
+          case LONG:
+            long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+            size += 8;
+            stream.writeLong(val);
+            break;
+          case DOUBLE:
+            double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+            size += 8;
+            stream.writeDouble(doubleVal);
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            stream.writeShort(aShort);
+            stream.write(bigDecimalInBytes);
+            break;
+          default:
+            throw new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
+        }
+      }
+    }
+  }
+
+  public void freeMemory() {
+    switch (managerType) {
+      case UNSAFE_MEMORY_MANAGER:
+        UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+        break;
+      default:
+        UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+        buffer.freeMemory();
+    }
+  }
+
+  public boolean isSaveToDisk() {
+    return saveToDisk;
+  }
+
+  public IntPointerBuffer getBuffer() {
+    return buffer;
+  }
+
+  public int getUsedSize() {
+    return lastSize;
+  }
+
+  public boolean canAdd() {
+    return lastSize < sizeToBeUsed;
+  }
+
+  public MemoryBlock getDataBlock() {
+    return dataBlock;
+  }
+
+  public static void set(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] |= (1L << index);
+  }
+
+  public static void unset(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    words[wordOffset] &= ~(1L << index);
+  }
+
+  public static boolean isSet(long[] words, int index) {
+    int wordOffset = (index >> 6);
+    return ((words[wordOffset] & (1L << index)) != 0);
+  }
+
+  public boolean[] getNoDictionaryDimensionMapping() {
+    return noDictionaryDimensionMapping;
+  }
+
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
+  }
+
+  public void setNewDataBlock(MemoryBlock newMemoryBlock) {
+    this.dataBlock = newMemoryBlock;
+    this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
+  }
+
+  public enum MemoryManagerType {
+    UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
new file mode 100644
index 0000000..88b72aa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
+import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class UnsafeSortDataRows {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
+  /**
+   * threadStatusObserver
+   */
+  private ThreadStatusObserver threadStatusObserver;
+  /**
+   * executor service for data sort holder
+   */
+  private ExecutorService dataSorterAndWriterExecutorService;
+  /**
+   * semaphore which will used for managing sorted data object arrays
+   */
+
+  private SortParameters parameters;
+
+  private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
+
+  private UnsafeCarbonRowPage rowPage;
+
+  private final Object addRowsLock = new Object();
+
+  private long inMemoryChunkSize;
+
+  private boolean enableInMemoryIntermediateMerge;
+
+  private int bytesAdded;
+
+  private long maxSizeAllowed;
+
+  /**
+   * semaphore which will used for managing sorted data object arrays
+   */
+  private Semaphore semaphore;
+
+  private final long taskId;
+
+  public UnsafeSortDataRows(SortParameters parameters,
+      UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
+    this.parameters = parameters;
+
+    this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
+
+    // observer of writing file in thread
+    this.threadStatusObserver = new ThreadStatusObserver();
+    this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+    this.inMemoryChunkSize = inMemoryChunkSize;
+    this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
+    enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
+            CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
+
+    this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
+    if (maxSizeAllowed <= 0) {
+      // If user does not input any memory size, then take half the size of usable memory configured
+      // in sort memory size.
+      this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
+    } else {
+      this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
+    }
+  }
+
+  /**
+   * This method will be used to initialize
+   */
+  public void initialize() throws MemoryException {
+    MemoryBlock baseBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+    boolean isMemoryAvailable =
+        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+    if (isMemoryAvailable) {
+      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+    }
+    this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
+        parameters.getDimColCount() + parameters.getComplexDimColCount(),
+        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
+        !isMemoryAvailable, taskId);
+    // Delete if any older file exists in sort temp folder
+    deleteSortLocationIfExists();
+
+    // create new sort temp directory
+    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
+    this.dataSorterAndWriterExecutorService =
+        Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    semaphore = new Semaphore(parameters.getNumberOfCores());
+  }
+
+  public boolean canAdd() {
+    return bytesAdded < maxSizeAllowed;
+  }
+
+  /**
+   * This method will be used to add new row
+   *
+   * @param rowBatch new rowBatch
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    synchronized (addRowsLock) {
+      addBatch(rowBatch, size);
+    }
+  }
+
+  /**
+   * This method will be used to add new row
+   *
+   * @param rowBatch new rowBatch
+   * @param size
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
+      throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    addBatch(rowBatch, size);
+  }
+
+  private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+    for (int i = 0; i < size; i++) {
+      if (rowPage.canAdd()) {
+        bytesAdded += rowPage.addRow(rowBatch[i]);
+      } else {
+        try {
+          if (enableInMemoryIntermediateMerge) {
+            unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+          }
+          unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+          semaphore.acquire();
+          dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
+          MemoryBlock memoryBlock =
+              UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+          boolean saveToDisk =
+              UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+          if (!saveToDisk) {
+            UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+          }
+          rowPage = new UnsafeCarbonRowPage(
+                  parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(),
+                  parameters.getDimColCount() + parameters.getComplexDimColCount(),
+                  parameters.getMeasureColCount(),
+                  parameters.getMeasureDataType(),
+                  memoryBlock,
+                  saveToDisk, taskId);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
+        } catch (Exception e) {
+          LOGGER.error(
+                  "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+          throw new CarbonSortKeyAndGroupByException(e);
+        }
+
+      }
+    }
+  }
+
+  /**
+   * This method will be used to add new row
+   */
+  public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    if (rowPage.canAdd()) {
+      rowPage.addRow(row);
+    } else {
+      try {
+        if (enableInMemoryIntermediateMerge) {
+          unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+        }
+        unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+        semaphore.acquire();
+        dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+        MemoryBlock memoryBlock =
+            UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+        boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+        if (!saveToDisk) {
+          UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+        }
+        rowPage = new UnsafeCarbonRowPage(
+            parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionarySortColumn(),
+            parameters.getDimColCount(), parameters.getMeasureColCount(),
+            parameters.getMeasureDataType(), memoryBlock,
+            saveToDisk, taskId);
+        rowPage.addRow(row);
+      } catch (Exception e) {
+        LOGGER.error(
+            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+        throw new CarbonSortKeyAndGroupByException(e);
+      }
+
+    }
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws InterruptedException
+   */
+  public void startSorting() throws InterruptedException {
+    LOGGER.info("Unsafe based sorting will be used");
+    if (this.rowPage.getUsedSize() > 0) {
+      TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+          new UnsafeIntSortDataFormat(rowPage));
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+            new UnsafeRowComparator(rowPage));
+      } else {
+        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+            new UnsafeRowComparatorForNormalDIms(rowPage));
+      }
+      unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+    } else {
+      rowPage.freeMemory();
+    }
+    startFileBasedMerge();
+  }
+
+  private void writeData(UnsafeCarbonRowPage rowPage, File file)
+      throws CarbonSortKeyAndGroupByException {
+    DataOutputStream stream = null;
+    try {
+      // open stream
+      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+          parameters.getFileWriteBufferSize()));
+      int actualSize = rowPage.getBuffer().getActualSize();
+      // write number of entries to the file
+      stream.writeInt(actualSize);
+      for (int i = 0; i < actualSize; i++) {
+        rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+            stream);
+      }
+
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    } finally {
+      // close streams
+      CarbonUtil.closeStreams(stream);
+    }
+  }
+
+  /**
+   * This method will be used to delete sort temp location is it is exites
+   */
+  public void deleteSortLocationIfExists() {
+    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
+  }
+
+  /**
+   * Below method will be used to start file based merge
+   *
+   * @throws InterruptedException
+   */
+  private void startFileBasedMerge() throws InterruptedException {
+    dataSorterAndWriterExecutorService.shutdown();
+    dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
+  }
+
+  /**
+   * Observer class for thread execution
+   * In case of any failure we need stop all the running thread
+   */
+  private class ThreadStatusObserver {
+    /**
+     * Below method will be called if any thread fails during execution
+     *
+     * @param exception
+     * @throws CarbonSortKeyAndGroupByException
+     */
+    public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
+      dataSorterAndWriterExecutorService.shutdownNow();
+      unsafeInMemoryIntermediateFileMerger.close();
+      parameters.getObserver().setFailed(true);
+      LOGGER.error(exception);
+      throw new CarbonSortKeyAndGroupByException(exception);
+    }
+  }
+
+  /**
+   * This class is responsible for sorting and writing the object
+   * array which holds the records equal to given array size
+   */
+  private class DataSorterAndWriter implements Runnable {
+    private UnsafeCarbonRowPage page;
+
+    public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
+      this.page = rowPage;
+    }
+
+    @Override
+    public void run() {
+      try {
+        long startTime = System.currentTimeMillis();
+        TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+            new UnsafeIntSortDataFormat(page));
+        // if sort_columns is not none, sort by sort_columns
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
+          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+              new UnsafeRowComparator(page));
+        } else {
+          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+              new UnsafeRowComparatorForNormalDIms(page));
+        }
+        if (page.isSaveToDisk()) {
+          // create a new file every time
+          // create a new file and pick a temp directory randomly every time
+          String tmpDir = parameters.getTempFileLocation()[
+              new Random().nextInt(parameters.getTempFileLocation().length)];
+          File sortTempFile = new File(
+              tmpDir + File.separator + parameters.getTableName()
+                  + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+          writeData(page, sortTempFile);
+          LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+              + " and write is: " + (System.currentTimeMillis() - startTime));
+          page.freeMemory();
+          // add sort temp filename to and arrayList. When the list size reaches 20 then
+          // intermediate merging of sort temp files will be triggered
+          unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
+        } else {
+          // creating a new memory block as size is already allocated
+          // so calling lazy memory allocator
+          MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
+              .allocateMemoryLazy(taskId, page.getDataBlock().size());
+          // copying data from working memory manager to sortmemory manager
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
+                  newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
+                  page.getDataBlock().size());
+          // free unsafememory manager
+          page.freeMemory();
+          page.setNewDataBlock(newMemoryBlock);
+          // add sort temp filename to and arrayList. When the list size reaches 20 then
+          // intermediate merging of sort temp files will be triggered
+          page.getBuffer().loadToUnsafe();
+          unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
+          LOGGER.info(
+              "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
+                  + (System.currentTimeMillis() - startTime));
+        }
+      } catch (Throwable e) {
+        try {
+          threadStatusObserver.notifyFailed(e);
+        } catch (CarbonSortKeyAndGroupByException ex) {
+          LOGGER.error(e);
+        }
+      } finally {
+        semaphore.release();
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
new file mode 100644
index 0000000..d02be9b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+  /**
+   * mapping of dictionary and no dictionary of sort_columns.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
+  private Object baseObject;
+
+  public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+      if (isNoDictionary) {
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+        sizeA += aShort1;
+
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
+        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+        sizeB += aShort2;
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+        sizeB += 4;
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+
+    return diff;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, Object baseObjectL, UnsafeCarbonRow rowR,
+      Object baseObjectR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+      if (isNoDictionary) {
+        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
+        byte[] byteArr1 = new byte[aShort1];
+        sizeA += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort1);
+        sizeA += aShort1;
+
+        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
+        byte[] byteArr2 = new byte[aShort2];
+        sizeB += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+                aShort2);
+        sizeB += aShort2;
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
+        sizeA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
+        sizeB += 4;
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+  private Object baseObject;
+
+  private int numberOfSortColumns;
+
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (int i = 0; i < numberOfSortColumns; i++) {
+      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+      sizeA += 4;
+      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+      sizeB += 4;
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
new file mode 100644
index 0000000..686e855
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * Interface for merging temporary sort files/ inmemory data
+ */
+public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
+
+  boolean hasNext();
+
+  void readRow()  throws CarbonSortKeyAndGroupByException;
+
+  Object[] getRow();
+
+  int numberOfRows();
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
new file mode 100644
index 0000000..2a8dc5b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+public class UnsafeCarbonRow {
+
+  public long address;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
new file mode 100644
index 0000000..a4bb684
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+public class UnsafeCarbonRowForMerge extends UnsafeCarbonRow {
+
+  public byte index;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
new file mode 100644
index 0000000..6b0cfa6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+
+public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeFinalMergePageHolder.class.getName());
+
+  private int counter;
+
+  private int actualSize;
+
+  private long[] mergedAddresses;
+
+  private byte[] rowPageIndexes;
+
+  private UnsafeCarbonRowPage[] rowPages;
+
+  private NewRowComparator comparator;
+
+  private Object[] currentRow;
+
+  private int columnSize;
+
+  public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
+      boolean[] noDictSortColumnMapping, int columnSize) {
+    this.actualSize = merger.getEntryCount();
+    this.mergedAddresses = merger.getMergedAddresses();
+    this.rowPageIndexes = merger.getRowPageIndexes();
+    this.rowPages = merger.getUnsafeCarbonRowPages();
+    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
+    this.columnSize = columnSize;
+  }
+
+  public boolean hasNext() {
+    if (counter < actualSize) {
+      return true;
+    }
+    return false;
+  }
+
+  public void readRow() {
+    currentRow = new Object[columnSize];
+    rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
+    counter++;
+  }
+
+  public Object[] getRow() {
+    return currentRow;
+  }
+
+  @Override public int compareTo(SortTempChunkHolder o) {
+    return comparator.compare(currentRow, o.getRow());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeFinalMergePageHolder)) {
+      return false;
+    }
+
+    UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj;
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
+  public int numberOfRows() {
+    return actualSize;
+  }
+
+  public void close() {
+    for (int i = 0; i < rowPages.length; i++) {
+      rowPages[i].freeMemory();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
new file mode 100644
index 0000000..6f05088
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+
+public class UnsafeInmemoryHolder implements SortTempChunkHolder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName());
+
+  private int counter;
+
+  private int actualSize;
+
+  private UnsafeCarbonRowPage rowPage;
+
+  private Object[] currentRow;
+
+  private long address;
+
+  private NewRowComparator comparator;
+
+  private int columnSize;
+
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
+    this.actualSize = rowPage.getBuffer().getActualSize();
+    this.rowPage = rowPage;
+    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+    this.columnSize = columnSize;
+  }
+
+  public boolean hasNext() {
+    if (counter < actualSize) {
+      return true;
+    }
+    return false;
+  }
+
+  public void readRow() {
+    currentRow = new Object[columnSize];
+    address = rowPage.getBuffer().get(counter);
+    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
+    counter++;
+  }
+
+  public Object[] getRow() {
+    return currentRow;
+  }
+
+  @Override public int compareTo(SortTempChunkHolder o) {
+    return comparator.compare(currentRow, o.getRow());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeInmemoryHolder)) {
+      return false;
+    }
+
+    UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj;
+
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
+  public int numberOfRows() {
+    return actualSize;
+  }
+
+  public void close() {
+    rowPage.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
new file mode 100644
index 0000000..3b9d8d7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
+
+/**
+ * It is used for merging unsafe inmemory intermediate data
+ */
+public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
+
+  private int counter;
+
+  private int actualSize;
+
+  private UnsafeCarbonRowPage rowPage;
+
+  private UnsafeCarbonRowForMerge currentRow;
+
+  private long address;
+
+  private UnsafeRowComparator comparator;
+
+  private Object baseObject;
+
+  private byte index;
+
+  public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) {
+    this.actualSize = rowPage.getBuffer().getActualSize();
+    this.rowPage = rowPage;
+    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+    this.comparator = new UnsafeRowComparator(rowPage);
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    currentRow = new UnsafeCarbonRowForMerge();
+    this.index = index;
+  }
+
+  public boolean hasNext() {
+    if (counter < actualSize) {
+      return true;
+    }
+    return false;
+  }
+
+  public void readRow() {
+    address = rowPage.getBuffer().get(counter);
+    currentRow = new UnsafeCarbonRowForMerge();
+    currentRow.address = address + rowPage.getDataBlock().getBaseOffset();
+    currentRow.index = index;
+    counter++;
+  }
+
+  public UnsafeCarbonRowForMerge getRow() {
+    return currentRow;
+  }
+
+  @Override public int compareTo(UnsafeInmemoryMergeHolder o) {
+    return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeInmemoryMergeHolder)) {
+      return false;
+    }
+
+    UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj;
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
+  public Object getBaseObject() {
+    return baseObject;
+  }
+
+  public void close() {
+    rowPage.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
new file mode 100644
index 0000000..331b9db
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName());
+
+  /**
+   * temp file
+   */
+  private File tempFile;
+
+  /**
+   * read stream
+   */
+  private DataInputStream stream;
+
+  /**
+   * entry count
+   */
+  private int entryCount;
+
+  /**
+   * return row
+   */
+  private Object[] returnRow;
+
+  /**
+   * number of measures
+   */
+  private int measureCount;
+
+  /**
+   * number of dimensionCount
+   */
+  private int dimensionCount;
+
+  /**
+   * number of complexDimensionCount
+   */
+  private int complexDimensionCount;
+
+  /**
+   * fileBufferSize for file reader stream size
+   */
+  private int fileBufferSize;
+
+  private Object[][] currentBuffer;
+
+  private Object[][] backupBuffer;
+
+  private boolean isBackupFilled;
+
+  private boolean prefetch;
+
+  private int bufferSize;
+
+  private int bufferRowCounter;
+
+  private ExecutorService executorService;
+
+  private Future<Void> submit;
+
+  private int prefetchRecordsProceesed;
+
+  /**
+   * sortTempFileNoOFRecordsInCompression
+   */
+  private int sortTempFileNoOFRecordsInCompression;
+
+  /**
+   * isSortTempFileCompressionEnabled
+   */
+  private boolean isSortTempFileCompressionEnabled;
+
+  /**
+   * totalRecordFetch
+   */
+  private int totalRecordFetch;
+
+  private int noDictionaryCount;
+
+  private DataType[] measureDataType;
+
+  private int numberOfObjectRead;
+  /**
+   * to store whether dimension is of dictionary type or not
+   */
+  private boolean[] isNoDictionaryDimensionColumn;
+
+  private int nullSetWordsLength;
+
+  private Comparator<Object[]> comparator;
+
+  /**
+   * Constructor to initialize
+   */
+  public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
+    // set temp file
+    this.tempFile = tempFile;
+
+    // set measure and dimension count
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    // set mdkey length
+    this.fileBufferSize = parameters.getFileBufferSize();
+    this.executorService = Executors.newFixedThreadPool(1);
+    this.measureDataType = parameters.getMeasureDataType();
+    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+    this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
+    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
+    initialize();
+  }
+
+  /**
+   * This method will be used to initialize
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while initializing
+   */
+  public void initialize() {
+    prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
+            CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
+    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
+    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
+    if (this.isSortTempFileCompressionEnabled) {
+      LOGGER.info("Compression was used while writing the sortTempFile");
+    }
+
+    try {
+      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (this.sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+            + " be used");
+
+        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed.Default value will be used");
+      this.sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+
+    initialise();
+  }
+
+  private void initialise() {
+    try {
+      if (isSortTempFileCompressionEnabled) {
+        this.bufferSize = sortTempFileNoOFRecordsInCompression;
+      }
+      stream = new DataInputStream(
+          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+      this.entryCount = stream.readInt();
+      LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
+      if (prefetch) {
+        new DataFetcher(false).call();
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        if (isSortTempFileCompressionEnabled) {
+          new DataFetcher(false).call();
+        }
+      }
+
+    } catch (FileNotFoundException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " No Found", e);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " No Found", e);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(tempFile + " Problem while reading", e);
+    }
+  }
+
+  /**
+   * This method will be used to read new row from file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while reading
+   */
+  public void readRow() throws CarbonSortKeyAndGroupByException {
+    if (prefetch) {
+      fillDataForPrefetch();
+    } else if (isSortTempFileCompressionEnabled) {
+      if (bufferRowCounter >= bufferSize) {
+        try {
+          new DataFetcher(false).call();
+          bufferRowCounter = 0;
+        } catch (Exception e) {
+          LOGGER.error(e);
+          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+        }
+
+      }
+      prefetchRecordsProceesed++;
+      returnRow = currentBuffer[bufferRowCounter++];
+    } else {
+      this.returnRow = getRowFromStream();
+    }
+  }
+
+  private void fillDataForPrefetch() {
+    if (bufferRowCounter >= bufferSize) {
+      if (isBackupFilled) {
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        totalRecordFetch += currentBuffer.length;
+        isBackupFilled = false;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      } else {
+        try {
+          submit.get();
+        } catch (Exception e) {
+          LOGGER.error(e);
+        }
+        bufferRowCounter = 0;
+        currentBuffer = backupBuffer;
+        isBackupFilled = false;
+        totalRecordFetch += currentBuffer.length;
+        if (totalRecordFetch < this.entryCount) {
+          submit = executorService.submit(new DataFetcher(true));
+        }
+      }
+    }
+    prefetchRecordsProceesed++;
+    returnRow = currentBuffer[bufferRowCounter++];
+  }
+
+  /**
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+    Object[] row = new Object[dimensionCount + measureCount];
+    try {
+      int dimCount = 0;
+      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+        if (isNoDictionaryDimensionColumn[dimCount]) {
+          short aShort = stream.readShort();
+          byte[] col = new byte[aShort];
+          stream.readFully(col);
+          row[dimCount] = col;
+        } else {
+          int anInt = stream.readInt();
+          row[dimCount] = anInt;
+        }
+      }
+
+      // write complex dimensions here.
+      for (; dimCount < dimensionCount; dimCount++) {
+        short aShort = stream.readShort();
+        byte[] col = new byte[aShort];
+        stream.readFully(col);
+        row[dimCount] = col;
+      }
+
+      long[] words = new long[nullSetWordsLength];
+      for (int i = 0; i < words.length; i++) {
+        words[i] = stream.readLong();
+      }
+
+      for (int mesCount = 0; mesCount < measureCount; mesCount++) {
+        if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
+          switch (measureDataType[mesCount]) {
+            case SHORT:
+              row[dimensionCount + mesCount] = stream.readShort();
+              break;
+            case INT:
+              row[dimensionCount + mesCount] = stream.readInt();
+              break;
+            case LONG:
+              row[dimensionCount + mesCount] = stream.readLong();
+              break;
+            case DOUBLE:
+              row[dimensionCount + mesCount] = stream.readDouble();
+              break;
+            case DECIMAL:
+              short aShort = stream.readShort();
+              byte[] bigDecimalInBytes = new byte[aShort];
+              stream.readFully(bigDecimalInBytes);
+              row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+              break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" +
+                  measureDataType[mesCount]);
+          }
+        }
+      }
+      return row;
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException(e);
+    }
+  }
+
+  /**
+   * below method will be used to get the row
+   *
+   * @return row
+   */
+  public Object[] getRow() {
+    return this.returnRow;
+  }
+
+  /**
+   * below method will be used to check whether any more records are present
+   * in file or not
+   *
+   * @return more row present in file
+   */
+  public boolean hasNext() {
+    if (prefetch || isSortTempFileCompressionEnabled) {
+      return this.prefetchRecordsProceesed < this.entryCount;
+    }
+    return this.numberOfObjectRead < this.entryCount;
+  }
+
+  /**
+   * Below method will be used to close streams
+   */
+  public void close() {
+    CarbonUtil.closeStreams(stream);
+    executorService.shutdown();
+  }
+
+  /**
+   * This method will number of entries
+   *
+   * @return entryCount
+   */
+  public int numberOfRows() {
+    return entryCount;
+  }
+
+  @Override public int compareTo(SortTempChunkHolder other) {
+    return comparator.compare(returnRow, other.getRow());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
+      return false;
+    }
+    UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
+
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    int hash = 0;
+    hash += 31 * measureCount;
+    hash += 31 * dimensionCount;
+    hash += 31 * complexDimensionCount;
+    hash += 31 * noDictionaryCount;
+    hash += tempFile.hashCode();
+    return hash;
+  }
+
+  private final class DataFetcher implements Callable<Void> {
+    private boolean isBackUpFilling;
+
+    private int numberOfRecords;
+
+    private DataFetcher(boolean backUp) {
+      isBackUpFilling = backUp;
+      calculateNumberOfRecordsToBeFetched();
+    }
+
+    private void calculateNumberOfRecordsToBeFetched() {
+      int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
+      numberOfRecords =
+          bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
+    }
+
+    @Override public Void call() throws Exception {
+      try {
+        if (isBackUpFilling) {
+          backupBuffer = prefetchRecordsFromFile(numberOfRecords);
+          isBackupFilled = true;
+        } else {
+          currentBuffer = prefetchRecordsFromFile(numberOfRecords);
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+      }
+      return null;
+    }
+
+  }
+
+  /**
+   * This method will read the records from sort temp file and keep it in a buffer
+   *
+   * @param numberOfRecords
+   * @return
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+      throws CarbonSortKeyAndGroupByException {
+    Object[][] records = new Object[numberOfRecords][];
+    for (int i = 0; i < numberOfRecords; i++) {
+      records[i] = getRowFromStream();
+    }
+    return records;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
new file mode 100644
index 0000000..3955864
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.merger;
+
+import java.util.AbstractQueue;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRowForMerge;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
+
+  /**
+   * fileCounter
+   */
+  private int holderCounter;
+
+  /**
+   * entryCount
+   */
+  private int entryCount;
+
+  private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
+
+  private long[] mergedAddresses;
+
+  private byte[] rowPageIndexes;
+
+  /**
+   * IntermediateFileMerger Constructor
+   */
+  public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
+      int totalSize) {
+    this.holderCounter = unsafeCarbonRowPages.length;
+    this.unsafeCarbonRowPages = unsafeCarbonRowPages;
+    this.mergedAddresses = new long[totalSize];
+    this.rowPageIndexes = new byte[totalSize];
+    this.entryCount = 0;
+  }
+
+  @Override
+  public void run() {
+    long intermediateMergeStartTime = System.currentTimeMillis();
+    int holderCounterConst = holderCounter;
+    try {
+      startSorting();
+      while (hasNext()) {
+        writeDataToMemory(next());
+      }
+      double intermediateMergeCostTime =
+          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+      LOGGER.info("============================== Intermediate Merge of " + holderCounterConst
+          + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while intermediate merging");
+    }
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private UnsafeCarbonRowForMerge getSortedRecordFromMemory()
+      throws CarbonSortKeyAndGroupByException {
+    UnsafeCarbonRowForMerge row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // change the file counter
+      --this.holderCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    poll.readRow();
+
+    // add to heap
+    this.recordHolderHeap.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startSorting() throws CarbonSortKeyAndGroupByException {
+    LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
+
+    // create record holder heap
+    createRecordHolderQueue(unsafeCarbonRowPages);
+
+    // iterate over file list and create chunk holder and add to heap
+    LOGGER.info("Started adding first record from row page");
+
+    UnsafeInmemoryMergeHolder unsafePageHolder = null;
+    byte index = 0;
+    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
+      // create chunk holder
+      unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
+
+      // initialize
+      unsafePageHolder.readRow();
+
+      // add to heap
+      this.recordHolderHeap.add(unsafePageHolder);
+    }
+
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   */
+  private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
+    // creating record holder heap
+    this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
+    return getSortedRecordFromMemory();
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  private boolean hasNext() {
+    return this.holderCounter > 0;
+  }
+
+  /**
+   * Below method will be used to write data to file
+   */
+  private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
+    mergedAddresses[entryCount] = row.address;
+    rowPageIndexes[entryCount] = row.index;
+    entryCount++;
+  }
+
+  public int getEntryCount() {
+    return entryCount;
+  }
+
+  public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
+    return unsafeCarbonRowPages;
+  }
+
+  public long[] getMergedAddresses() {
+    return mergedAddresses;
+  }
+
+  public byte[] getRowPageIndexes() {
+    return rowPageIndexes;
+  }
+}