You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/20 12:44:32 UTC

carbondata git commit: [CARBONDATA-1318]Fixed Concurrent table data loading unsafe memory issue

Repository: carbondata
Updated Branches:
  refs/heads/master 69c634c5f -> df22368d9


[CARBONDATA-1318]Fixed Concurrent table data loading unsafe memory issue

Fixed task cancellation leak issue
Fixed task cleanup issue in data loading
Fixed Concurrent table data loading unsafe memory issue
 @CarbonDataQA

This closes#1185


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/df22368d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/df22368d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/df22368d

Branch: refs/heads/master
Commit: df22368d98f0390cc7f9c1289a81257adf4509a6
Parents: 69c634c
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Jul 19 13:42:52 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Jul 20 18:12:52 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +-
 .../page/UnsafeFixLengthColumnPage.java         |  14 +-
 .../page/UnsafeVarLengthColumnPage.java         |  13 +-
 .../core/memory/IntPointerBuffer.java           |  42 ++--
 .../core/memory/UnsafeMemoryManager.java        |  99 +++++---
 .../core/memory/UnsafeSortMemoryManager.java    | 249 +++++++++++++++++++
 .../executor/impl/AbstractQueryExecutor.java    |   3 +
 .../carbondata/core/util/CarbonTaskInfo.java    |  40 +++
 .../core/util/ThreadLocalTaskInfo.java          |  33 +++
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   5 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  24 +-
 .../carbondata/spark/util/CommonUtil.scala      |  23 +-
 .../processing/csvload/CSVInputFormat.java      |   3 +
 .../processing/newflow/DataLoadExecutor.java    |  19 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  32 ++-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  54 +++-
 16 files changed, 554 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccb6344..dfc2153 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1184,9 +1184,11 @@ public final class CarbonCommonConstants {
   public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64";
   @CarbonProperty
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb";
-
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
 
+  @CarbonProperty
+  public static final String UNSAFE_WORKING_MEMORY_IN_MB = "carbon.unsafe.working.memory.in.mb";
+  public static final String UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT = "512";
   /**
    * Sorts the data in batches and writes the batch data to store with index file.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index e76c2c4..5dcc685 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
 
@@ -42,6 +43,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   // base offset of memoryBlock
   private long baseOffset;
 
+  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
   private static final int byteBits = BYTE.getSizeBits();
   private static final int shortBits = DataType.SHORT.getSizeBits();
   private static final int intBits = DataType.INT.getSizeBits();
@@ -59,13 +62,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       case FLOAT:
       case DOUBLE:
         int size = pageSize << dataType.getSizeBits();
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
         baseAddress = memoryBlock.getBaseObject();
         baseOffset = memoryBlock.getBaseOffset();
         break;
       case SHORT_INT:
         size = pageSize * 3;
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
         baseAddress = memoryBlock.getBaseObject();
         baseOffset = memoryBlock.getBaseOffset();
         break;
@@ -302,7 +305,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   public void freeMemory() {
     if (memoryBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       memoryBlock = null;
       baseAddress = null;
       baseOffset = 0;
@@ -360,13 +363,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       // use raw compression and copy to byte[]
       int inputSize = pageSize * dataType.getSizeInBytes();
       int compressedMaxSize = compressor.maxCompressedLength(inputSize);
-      MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize);
+      MemoryBlock compressed =
+          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
       long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
       assert outSize < Integer.MAX_VALUE;
       byte[] output = new byte[(int) outSize];
       CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output,
           CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize);
-      UnsafeMemoryManager.INSTANCE.freeMemory(compressed);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, compressed);
       return output;
     } else {
       return super.compress(compressor);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index dd6abc5..0cd64db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 // This extension uses unsafe memory to store page data, for variable length data type (string,
 // decimal)
@@ -47,6 +48,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   private static final double FACTOR = 1.25;
 
+  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
   /**
    * create a page
    * @param dataType data type
@@ -55,7 +58,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
     super(dataType, pageSize);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
-    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity));
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
   }
@@ -69,7 +72,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException {
     super(dataType, pageSize);
     this.capacity = capacity;
-    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity));
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
   }
@@ -77,7 +80,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   @Override
   public void freeMemory() {
     if (memoryBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       memoryBlock = null;
       baseAddress = null;
       baseOffset = 0;
@@ -90,10 +93,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   private void ensureMemory(int requestSize) throws MemoryException {
     if (totalLength + requestSize > capacity) {
       int newSize = 2 * capacity;
-      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(newSize);
+      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
       CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset,
           newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
-      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       memoryBlock = newBlock;
       baseAddress = newBlock.getBaseObject();
       baseOffset = newBlock.getBaseOffset();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index 0d604fd..dadb1e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -17,26 +17,32 @@
 
 package org.apache.carbondata.core.memory;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
 /**
  * Holds the pointers for rows.
  */
 public class IntPointerBuffer {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(IntPointerBuffer.class.getName());
+
   private int length;
 
   private int actualSize;
 
   private int[] pointerBlock;
 
-  private MemoryBlock baseBlock;
-
   private MemoryBlock pointerMemoryBlock;
 
-  public IntPointerBuffer(MemoryBlock baseBlock) {
+  private long taskId;
+
+  public IntPointerBuffer(long taskId) {
     // TODO can be configurable, it is initial size and it can grow automatically.
     this.length = 100000;
     pointerBlock = new int[length];
-    this.baseBlock = baseBlock;
+    this.taskId = taskId;
   }
 
   public IntPointerBuffer(int length) {
@@ -67,24 +73,25 @@ public class IntPointerBuffer {
     return pointerBlock[rowId];
   }
 
-  public void loadToUnsafe() throws MemoryException {
-    pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4);
-    for (int i = 0; i < pointerBlock.length; i++) {
-      CarbonUnsafe.unsafe
-          .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
-              pointerBlock[i]);
+  public void loadToUnsafe() {
+    try {
+      pointerMemoryBlock =
+          UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4);
+      for (int i = 0; i < pointerBlock.length; i++) {
+        CarbonUnsafe.unsafe
+            .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
+                pointerBlock[i]);
+      }
+      pointerBlock = null;
+    } catch (MemoryException e) {
+      LOGGER.warn("Not enough memory for allocating pointer buffer, sorting in heap");
     }
-    pointerBlock = null;
   }
 
   public int getActualSize() {
     return actualSize;
   }
 
-  public MemoryBlock getBaseBlock() {
-    return baseBlock;
-  }
-
   public int[] getPointerBlock() {
     return pointerBlock;
   }
@@ -103,10 +110,7 @@ public class IntPointerBuffer {
   public void freeMemory() {
     pointerBlock = null;
     if (pointerMemoryBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
-    }
-    if (baseBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
+      UnsafeSortMemoryManager.INSTANCE.freeMemory(this.taskId, pointerMemoryBlock);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 28e63a9..991bc90 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -17,7 +17,10 @@
 
 package org.apache.carbondata.core.memory;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -36,23 +39,23 @@ public class UnsafeMemoryManager {
   private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
           CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+  private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
   static {
     long size;
     try {
       size = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
-              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+          .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+              CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
     } catch (Exception e) {
       size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
       LOGGER.info("Wrong memory size given, "
           + "so setting default value to " + size);
     }
-    if (size < 1024) {
-      size = 1024;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+    if (size < 512) {
+      size = 512;
+      LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, "
           + "so setting default value to " + size);
     }
-
     long takenSize = size * 1024 * 1024;
     MemoryAllocator allocator;
     if (offHeap) {
@@ -65,6 +68,7 @@ public class UnsafeMemoryManager {
       allocator = MemoryAllocator.HEAP;
     }
     INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+    taskIdToMemoryBlockMap = new HashMap<>();
   }
 
   public static final UnsafeMemoryManager INSTANCE;
@@ -75,76 +79,91 @@ public class UnsafeMemoryManager {
 
   private MemoryAllocator allocator;
 
-  private long minimumMemory;
-
-  // for debug purpose
-  private Set<MemoryBlock> set = new HashSet<>();
-
   private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
     this.totalMemory = totalMemory;
     this.allocator = allocator;
-    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
-    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
-    if (totalWorkingMemoryForAllThreads >= totalMemory) {
-      throw new RuntimeException("Working memory should be less than total memory configured, "
-          + "so either reduce the loading threads or increase the memory size. "
-          + "(Number of threads * number of threads) should be less than total unsafe memory");
-    }
-    minimumMemory = totalWorkingMemoryForAllThreads;
-    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
-        + " and minimum reserve memory " + minimumMemory);
+    LOGGER
+        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
   }
 
-  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
+  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
       if (LOGGER.isDebugEnabled()) {
-        set.add(allocate);
-        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
-            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
+        LOGGER.debug(
+            "Working Memory block (" + allocate + ") is created with size " + allocate.size()
+                + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+                + "Bytes");
+      }
+      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+      if (null == listOfMemoryBlock) {
+        listOfMemoryBlock = new HashSet<>();
+        taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
+      listOfMemoryBlock.add(allocate);
       return allocate;
     }
     return null;
   }
 
-  public synchronized void freeMemory(MemoryBlock memoryBlock) {
+  public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
+    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
     allocator.free(memoryBlock);
     memoryUsed -= memoryBlock.size();
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
-      set.remove(memoryBlock);
-      LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed +
-          "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
+      LOGGER.debug(
+          "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
+              totalMemory - memoryUsed));
     }
   }
 
-  private synchronized long getAvailableMemory() {
-    return totalMemory - memoryUsed;
+  public void freeMemoryAll(long taskId) {
+    Set<MemoryBlock> memoryBlockSet = null;
+    synchronized (INSTANCE) {
+      memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
+    }
+    long occuppiedMemory = 0;
+    if (null != memoryBlockSet) {
+      Iterator<MemoryBlock> iterator = memoryBlockSet.iterator();
+      MemoryBlock memoryBlock = null;
+      while (iterator.hasNext()) {
+        memoryBlock = iterator.next();
+        occuppiedMemory += memoryBlock.size();
+        allocator.free(memoryBlock);
+      }
+    }
+    synchronized (INSTANCE) {
+      memoryUsed -= occuppiedMemory;
+      memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
+              totalMemory - memoryUsed));
+    }
   }
 
-  public boolean isMemoryAvailable() {
-    return getAvailableMemory() > minimumMemory;
+  public synchronized boolean isMemoryAvailable() {
+    return memoryUsed > totalMemory;
   }
 
   public long getUsableMemory() {
-    return totalMemory - minimumMemory;
+    return totalMemory;
   }
 
   /**
    * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    */
-  public static MemoryBlock allocateMemoryWithRetry(long size) throws MemoryException {
+  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
     MemoryBlock baseBlock = null;
     int tries = 0;
-    while (tries < 100) {
-      baseBlock = INSTANCE.allocateMemory(size);
+    while (tries < 300) {
+      baseBlock = INSTANCE.allocateMemory(taskId, size);
       if (baseBlock == null) {
         try {
-          Thread.sleep(50);
+          Thread.sleep(500);
         } catch (InterruptedException e) {
           throw new MemoryException(e);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
new file mode 100644
index 0000000..d975cd4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.memory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Memory manager to keep track of
+ * all memory for storing the sorted data
+ */
+public class UnsafeSortMemoryManager {
+
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
+
+  /**
+   * offheap is enabled
+   */
+  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+
+  /**
+   * map to keep taskid to memory blocks
+   */
+  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
+
+  /**
+   * singleton instance
+   */
+  public static final UnsafeSortMemoryManager INSTANCE;
+
+  /**
+   * total memory available for sort data storage
+   */
+  private long totalMemory;
+
+  /**
+   * current memory used
+   */
+  private long memoryUsed;
+
+  /**
+   * current memory allocator
+   */
+  private MemoryAllocator allocator;
+
+  static {
+    long size;
+    try {
+      size = Long.parseLong(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+    } catch (Exception e) {
+      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
+      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
+    }
+    if (size < 1024) {
+      size = 1024;
+      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+          + "so setting default value to " + size);
+    }
+
+    long takenSize = size * 1024 * 1024;
+    MemoryAllocator allocator;
+    if (offHeap) {
+      allocator = MemoryAllocator.UNSAFE;
+    } else {
+      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
+      if (takenSize > maxMemory) {
+        takenSize = maxMemory;
+      }
+      allocator = MemoryAllocator.HEAP;
+    }
+    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
+    taskIdToMemoryBlockMap = new HashMap<>();
+  }
+
+  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
+    this.totalMemory = totalMemory;
+    this.allocator = allocator;
+    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
+  }
+
+  /**
+   * Below method will be used to check whether memory required is
+   * available or not
+   *
+   * @param required
+   * @return if memory available
+   */
+  public synchronized boolean isMemoryAvailable(long required) {
+    return memoryUsed + required < totalMemory;
+  }
+
+  /**
+   * Below method will be used to allocate dummy memory
+   * this will be used to allocate first and then used when u need
+   *
+   * @param size
+   */
+  public synchronized void allocateDummyMemory(long size) {
+    memoryUsed += size;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Working Memory block (" + size + ") is created with size " + size
+          + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+          + "Bytes");
+    }
+  }
+
+  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
+    allocator.free(memoryBlock);
+    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
+    memoryUsed -= memoryBlock.size();
+    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
+              totalMemory - memoryUsed));
+    }
+  }
+
+  /**
+   * Below method will be used to free all the
+   * memory occupied for a task, this will be useful
+   * when in case of task failure we need to clear all the memory occupied
+   * @param taskId
+   */
+  public void freeMemoryAll(long taskId) {
+    Set<MemoryBlock> memoryBlockSet = null;
+    synchronized (INSTANCE) {
+      memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
+    }
+    long occuppiedMemory = 0;
+    if (null != memoryBlockSet) {
+      Iterator<MemoryBlock> iterator = memoryBlockSet.iterator();
+      MemoryBlock memoryBlock = null;
+      while (iterator.hasNext()) {
+        memoryBlock = iterator.next();
+        occuppiedMemory += memoryBlock.size();
+        allocator.free(memoryBlock);
+      }
+    }
+    synchronized (INSTANCE) {
+      memoryUsed -= occuppiedMemory;
+      memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
+              totalMemory - memoryUsed));
+    }
+  }
+
+  /**
+   * Before calling this method caller should call allocateMemoryDummy
+   * This method will be used to allocate the memory, this can be used
+   * when caller wants to allocate memory first and used it anytime
+   * @param taskId
+   * @param memoryRequested
+   * @return memory block
+   */
+  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
+    MemoryBlock allocate = allocator.allocate(memoryRequested);
+    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+    if (null == listOfMemoryBlock) {
+      listOfMemoryBlock = new HashSet<>();
+      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
+    }
+    listOfMemoryBlock.add(allocate);
+    return allocate;
+  }
+
+  /**
+   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+   */
+  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
+    MemoryBlock baseBlock = null;
+    int tries = 0;
+    while (tries < 100) {
+      baseBlock = INSTANCE.allocateMemory(taskId, size);
+      if (baseBlock == null) {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          throw new MemoryException(e);
+        }
+      } else {
+        break;
+      }
+      tries++;
+    }
+    if (baseBlock == null) {
+      throw new MemoryException("Not enough memory");
+    }
+    return baseBlock;
+  }
+
+  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
+    if (memoryUsed + memoryRequested <= totalMemory) {
+      MemoryBlock allocate = allocator.allocate(memoryRequested);
+      memoryUsed += allocate.size();
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Working Memory block (" + allocate.size() + ") is created with size " + allocate.size()
+                + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+                + "Bytes");
+      }
+      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+      if (null == listOfMemoryBlock) {
+        listOfMemoryBlock = new HashSet<>();
+        taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
+      }
+      listOfMemoryBlock.add(allocate);
+      return allocate;
+    }
+    return null;
+  }
+
+  public static boolean isOffHeap() {
+    return offHeap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..faa4564 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -62,6 +63,7 @@ import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -519,6 +521,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     if (null != queryIterator) {
       queryIterator.close();
     }
+    UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
     if (null != queryProperties.executorService) {
       queryProperties.executorService.shutdown();
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
new file mode 100644
index 0000000..d3e4d7a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+
+/**
+ * Value object to keep track of all the thread local variable
+ */
+public class CarbonTaskInfo implements Serializable {
+
+  /**
+   * serial version id
+   */
+  private static final long serialVersionUID = 1L;
+
+  public long taskId;
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
new file mode 100644
index 0000000..8c871b8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+/**
+ * Class to keep all the thread local variable for task
+ */
+public class ThreadLocalTaskInfo {
+  static final InheritableThreadLocal<CarbonTaskInfo> threadLocal =
+      new InheritableThreadLocal<CarbonTaskInfo>();
+
+  public static void setCarbonTaskInfo(CarbonTaskInfo carbonTaskInfo) {
+    threadLocal.set(carbonTaskInfo);
+  }
+
+  public static CarbonTaskInfo getCarbonTaskInfo() {
+    return threadLocal.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 106a9fd..48e97ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
 
 /**
  * This RDD maintains session level ThreadLocal
@@ -41,6 +41,9 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+    val carbonTaskInfo = new CarbonTaskInfo
+    carbonTaskInfo.setTaskId(System.nanoTime)
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
     internalCompute(split, context)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index d325f71..ac1c723 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.processing.csvload.BlockDetails
 import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
@@ -240,7 +240,11 @@ class NewCarbonDataLoadRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-        new DataLoadExecutor().execute(model,
+        val executor = new DataLoadExecutor()
+        // in case of success, failure or cancelation clear memory and stop execution
+        context.addTaskCompletionListener { context => executor.close()
+          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        executor.execute(model,
           loader.storeLocation,
           recordReaders)
       } catch {
@@ -327,7 +331,6 @@ class NewCarbonDataLoadRDD[K, V](
           }
         }
       }
-
       /**
        * generate blocks id
        *
@@ -423,7 +426,6 @@ class NewDataFrameLoaderRDD[K, V](
           recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
               carbonLoadModel, context)
         }
-
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
           null,
@@ -431,7 +433,11 @@ class NewDataFrameLoaderRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-        new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
+        val executor = new DataLoadExecutor
+        // in case of success, failure or cancelation clear memory and stop execution
+        context.addTaskCompletionListener { context => executor.close()
+          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: BadRecordFoundException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
@@ -606,7 +612,6 @@ class PartitionTableDataLoaderRDD[K, V](
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
-
         val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
           new NewRddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel, context)
         }
@@ -618,7 +623,11 @@ class PartitionTableDataLoaderRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-        new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+        val executor = new DataLoadExecutor
+        // in case of success, failure or cancelation clear memory and stop execution
+        context.addTaskCompletionListener { context => executor.close()
+          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        executor.execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: BadRecordFoundException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
@@ -642,7 +651,6 @@ class PartitionTableDataLoaderRDD[K, V](
         }
       }
       var finished = false
-
       override def hasNext: Boolean = !finished
 
       override def next(): (K, V) = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index bb8c5a6..579347b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.util
 
+
 import java.text.SimpleDateFormat
 import java.util
 
@@ -35,6 +36,7 @@ import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -615,8 +617,21 @@ object CommonUtil {
     result.result()
   }
 
-  def partitionInfoOutput: Seq[Attribute] = Seq(
-    AttributeReference("partition", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "partitions info").build())()
-  )
+  def partitionInfoOutput: Seq[Attribute] = {
+    Seq(
+      AttributeReference("partition", StringType, nullable = false,
+        new MetadataBuilder().putString("comment", "partitions info").build())()
+    )
+  }
+
+  /**
+   * Method to clear the memory for a task
+   * if present
+   */
+  def clearUnsafeMemory(taskId: Long) {
+    UnsafeMemoryManager.
+      INSTANCE.freeMemoryAll(taskId)
+    UnsafeSortMemoryManager.
+      INSTANCE.freeMemoryAll(taskId)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index e252e7f..3a6428d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -304,6 +304,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
         if (boundedInputStream != null) {
           boundedInputStream.close();
         }
+        if (null != csvParser) {
+          csvParser.stopParsing();
+        }
       } finally {
         reader = null;
         boundedInputStream = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index 66e6d37..d4e79f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -33,9 +33,12 @@ public class DataLoadExecutor {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
 
+  private AbstractDataLoadProcessorStep loadProcessorStep;
+
+  private boolean isClosed;
+
   public void execute(CarbonLoadModel loadModel, String storeLocation,
       CarbonIterator<Object[]>[] inputIterators) throws Exception {
-    AbstractDataLoadProcessorStep loadProcessorStep = null;
     try {
       loadProcessorStep =
           new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
@@ -60,10 +63,6 @@ public class DataLoadExecutor {
     } finally {
       removeBadRecordKey(
           loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
-      if (loadProcessorStep != null) {
-        // 3. Close the step
-        loadProcessorStep.close();
-      }
     }
   }
 
@@ -91,4 +90,14 @@ public class DataLoadExecutor {
     String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
     BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
   }
+
+  /**
+   * Method to clean all the resource
+   */
+  public void close() {
+    if (!isClosed && loadProcessorStep != null) {
+      loadProcessorStep.close();
+    }
+    isClosed = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 2ac138b..9d2ee9a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -55,9 +57,13 @@ public class UnsafeCarbonRowPage {
 
   private boolean saveToDisk;
 
+  private MemoryManagerType managerType;
+
+  private long taskId;
+
   public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
       boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
-      MemoryBlock memoryBlock, boolean saveToDisk) {
+      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
     this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
@@ -65,10 +71,12 @@ public class UnsafeCarbonRowPage {
     this.measureDataType = type;
     this.saveToDisk = saveToDisk;
     this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
-    buffer = new IntPointerBuffer(memoryBlock);
-    this.dataBlock = buffer.getBaseBlock();
+    this.taskId = taskId;
+    buffer = new IntPointerBuffer(this.taskId);
+    this.dataBlock = memoryBlock;
     // TODO Only using 98% of space for safe side.May be we can have different logic.
     sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
+    this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
   public int addRow(Object[] row) {
@@ -324,7 +332,14 @@ public class UnsafeCarbonRowPage {
   }
 
   public void freeMemory() {
-    buffer.freeMemory();
+    switch (managerType) {
+      case UNSAFE_MEMORY_MANAGER:
+        UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+        break;
+      default:
+        UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+        buffer.freeMemory();
+    }
   }
 
   public boolean isSaveToDisk() {
@@ -369,4 +384,13 @@ public class UnsafeCarbonRowPage {
   public boolean[] getNoDictionarySortColumnMapping() {
     return noDictionarySortColumnMapping;
   }
+
+  public void setNewDataBlock(MemoryBlock newMemoryBlock) {
+    this.dataBlock = newMemoryBlock;
+    this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
+  }
+
+  public enum MemoryManagerType {
+    UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index a42d0ea..8021b45 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -31,12 +31,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
 import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
@@ -86,6 +89,8 @@ public class UnsafeSortDataRows {
    */
   private Semaphore semaphore;
 
+  private final long taskId;
+
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
     this.parameters = parameters;
@@ -94,9 +99,9 @@ public class UnsafeSortDataRows {
 
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
-
+    this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
     this.inMemoryChunkSize = inMemoryChunkSize;
-    this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024;
+    this.inMemoryChunkSize = inMemoryChunkSize * 1024 * 1024;
     enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
             CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
@@ -115,12 +120,18 @@ public class UnsafeSortDataRows {
    * This method will be used to initialize
    */
   public void initialize() throws MemoryException {
-    MemoryBlock baseBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
+    MemoryBlock baseBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+    boolean isMemoryAvailable =
+        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+    if (isMemoryAvailable) {
+      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+    }
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
         parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
         parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
-        !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
+        !isMemoryAvailable, taskId);
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -177,8 +188,13 @@ public class UnsafeSortDataRows {
           unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
           semaphore.acquire();
           dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-          MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
-          boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+          MemoryBlock memoryBlock =
+              UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+          boolean saveToDisk =
+              UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+          if (!saveToDisk) {
+            UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+          }
           rowPage = new UnsafeCarbonRowPage(
                   parameters.getNoDictionaryDimnesionColumn(),
                   parameters.getNoDictionarySortColumn(),
@@ -186,7 +202,7 @@ public class UnsafeSortDataRows {
                   parameters.getMeasureColCount(),
                   parameters.getMeasureDataType(),
                   memoryBlock,
-                  saveToDisk);
+                  saveToDisk, taskId);
           bytesAdded += rowPage.addRow(rowBatch[i]);
         } catch (Exception e) {
           LOGGER.error(
@@ -214,14 +230,18 @@ public class UnsafeSortDataRows {
         unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
         semaphore.acquire();
         dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-        MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
-        boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+        MemoryBlock memoryBlock =
+            UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+        boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+        if (!saveToDisk) {
+          UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+        }
         rowPage = new UnsafeCarbonRowPage(
             parameters.getNoDictionaryDimnesionColumn(),
             parameters.getNoDictionarySortColumn(),
             parameters.getDimColCount(), parameters.getMeasureColCount(),
             parameters.getMeasureDataType(), memoryBlock,
-            saveToDisk);
+            saveToDisk, taskId);
         rowPage.addRow(row);
       } catch (Exception e) {
         LOGGER.error(
@@ -343,7 +363,7 @@ public class UnsafeSortDataRows {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparatorForNormalDIms(page));
         }
-        if (rowPage.isSaveToDisk()) {
+        if (page.isSaveToDisk()) {
           // create a new file every time
           File sortTempFile = new File(
               parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System
@@ -356,6 +376,18 @@ public class UnsafeSortDataRows {
           // intermediate merging of sort temp files will be triggered
           unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
         } else {
+          // creating a new memory block as size is already allocated
+          // so calling lazy memory allocator
+          MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
+              .allocateMemoryLazy(taskId, page.getDataBlock().size());
+          // copying data from working memory manager to sortmemory manager
+          CarbonUnsafe.unsafe
+              .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
+                  newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
+                  page.getDataBlock().size());
+          // free unsafememory manager
+          page.freeMemory();
+          page.setNewDataBlock(newMemoryBlock);
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered
           page.getBuffer().loadToUnsafe();