You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/27 02:08:01 UTC

carbondata git commit: [CARBONDATA-3047] Added fallback mechanism when offheap memory is not available in UnsafeMemoryManager

Repository: carbondata
Updated Branches:
  refs/heads/master 917f34421 -> bbbe47905


[CARBONDATA-3047] Added fallback mechanism when offheap memory is not available in UnsafeMemoryManager

Changes Proposed In this PR:
Currently when unsafe working memory is not available UnsafeMemoryManager is throwing MemoryException and killing the running task.
To make system more easier for the user now added fallback to heap when offheap memory is not available

This closes #2841


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bbbe4790
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bbbe4790
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bbbe4790

Branch: refs/heads/master
Commit: bbbe47905f90b518a6ca670848bd4370f3504e8e
Parents: 917f344
Author: kumarvishal09 <ku...@gmail.com>
Authored: Sat Oct 27 02:01:11 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Oct 27 07:37:36 2018 +0530

----------------------------------------------------------------------
 .../core/memory/UnsafeMemoryManager.java        | 94 +++++++-------------
 1 file changed, 34 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbbe4790/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index db0258f..7ccbc3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -41,7 +41,7 @@ public class UnsafeMemoryManager {
   private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
           CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
-  private static Map<String,Set<MemoryBlock>> taskIdToMemoryBlockMap;
+  private static Map<String,Set<MemoryBlock>> taskIdToOffheapMemoryBlockMap;
   static {
     long size = 0L;
     String configuredWorkingMemorySize = null;
@@ -67,7 +67,7 @@ public class UnsafeMemoryManager {
         }
       }
     } catch (Exception e) {
-      LOGGER.info("Invalid working memory size value: " + configuredWorkingMemorySize);
+      LOGGER.info("Invalid offheap working memory size value: " + configuredWorkingMemorySize);
     }
     long takenSize = size;
     MemoryType memoryType;
@@ -77,25 +77,18 @@ public class UnsafeMemoryManager {
       if (takenSize < defaultSize) {
         takenSize = defaultSize;
         LOGGER.warn(String.format(
-            "It is not recommended to set unsafe working memory size less than %sMB,"
+            "It is not recommended to set offheap working memory size less than %sMB,"
                 + " so setting default value to %d",
             CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT, defaultSize));
       }
       takenSize = takenSize * 1024 * 1024;
     } else {
-      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
-      if (takenSize == 0L) {
-        takenSize = maxMemory;
-      } else {
-        takenSize = takenSize * 1024 * 1024;
-        if (takenSize > maxMemory) {
-          takenSize = maxMemory;
-        }
-      }
+      // For ON-HEAP case not considering any size as it will based on max memory(Xmx) given to
+      // JVM and JVM will take care of freeing the memory
       memoryType = MemoryType.ONHEAP;
     }
     INSTANCE = new UnsafeMemoryManager(takenSize, memoryType);
-    taskIdToMemoryBlockMap = new HashMap<>();
+    taskIdToOffheapMemoryBlockMap = new HashMap<>();
   }
 
   public static final UnsafeMemoryManager INSTANCE;
@@ -109,50 +102,57 @@ public class UnsafeMemoryManager {
   private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) {
     this.totalMemory = totalMemory;
     this.memoryType = memoryType;
-    LOGGER.info(
-        "Working Memory manager is created with size " + totalMemory + " with " + memoryType);
+    LOGGER.info("offheap Working Memory manager is created with size " + totalMemory + " with "
+        + memoryType);
   }
 
   private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String taskId,
       long memoryRequested) {
-    if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock memoryBlock = getMemoryAllocator(memoryType).allocate(memoryRequested);
+    MemoryBlock memoryBlock;
+    if (memoryUsed + memoryRequested <= totalMemory && memoryType == MemoryType.OFFHEAP) {
+      memoryBlock = MemoryAllocator.UNSAFE.allocate(memoryRequested);
       memoryUsed += memoryBlock.size();
-      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+      Set<MemoryBlock> listOfMemoryBlock = taskIdToOffheapMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
         listOfMemoryBlock = new HashSet<>();
-        taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
+        taskIdToOffheapMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
       listOfMemoryBlock.add(memoryBlock);
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(String.format("Creating working Memory block (%s) with size %d."
+        LOGGER.debug(String.format("Creating Offheap working Memory block (%s) with size %d."
                 + " Total memory used %d Bytes, left %d Bytes.",
             memoryBlock.toString(), memoryBlock.size(), memoryUsed, totalMemory - memoryUsed));
       }
-      return memoryBlock;
+    } else {
+      // not adding on heap memory block to map as JVM will take care of freeing the memory
+      memoryBlock = MemoryAllocator.HEAP.allocate(memoryRequested);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            String.format("Creating onheap working Memory block (%s) with size:", memoryBlock));
+      }
     }
-    return null;
+    return memoryBlock;
   }
 
   public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) {
-    if (taskIdToMemoryBlockMap.containsKey(taskId)) {
-      taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
+    if (taskIdToOffheapMemoryBlockMap.containsKey(taskId)) {
+      taskIdToOffheapMemoryBlockMap.get(taskId).remove(memoryBlock);
     }
     if (!memoryBlock.isFreedStatus()) {
       getMemoryAllocator(memoryBlock.getMemoryType()).free(memoryBlock);
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(String.format(
-            "Freeing working memory block (%s) with size: %d, current available memory is: %d",
-            memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed));
+        LOGGER.debug(String.format("Freeing offheap working memory block (%s) with size: %d, "
+                + "current available memory is: %d", memoryBlock.toString(), memoryBlock.size(),
+            totalMemory - memoryUsed));
       }
     }
   }
 
   public synchronized void freeMemoryAll(String taskId) {
-    Set<MemoryBlock> memoryBlockSet = null;
-    memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
+    Set<MemoryBlock> memoryBlockSet;
+    memoryBlockSet = taskIdToOffheapMemoryBlockMap.remove(taskId);
     long occuppiedMemory = 0;
     if (null != memoryBlockSet) {
       Iterator<MemoryBlock> iterator = memoryBlockSet.iterator();
@@ -169,12 +169,12 @@ public class UnsafeMemoryManager {
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(String.format(
-          "Freeing working memory of size %d. Current available memory is %d",
+          "Freeing offheap working memory of size %d. Current available memory is %d",
           occuppiedMemory, totalMemory - memoryUsed));
     }
     LOGGER.info(String.format(
-        "Total working memory used after task %s is %d. Current running tasks are %s",
-        taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
+        "Total offheap working memory used after task %s is %d. Current running tasks are %s",
+        taskId, memoryUsed, StringUtils.join(taskIdToOffheapMemoryBlockMap.keySet(), ", ")));
   }
 
   public synchronized boolean isMemoryAvailable() {
@@ -194,29 +194,8 @@ public class UnsafeMemoryManager {
   }
 
   public static MemoryBlock allocateMemoryWithRetry(MemoryType memoryType, String taskId,
-      long size) throws MemoryException {
-    MemoryBlock baseBlock = null;
-    int tries = 0;
-    while (tries < 300) {
-      baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size);
-      if (baseBlock == null) {
-        try {
-          LOGGER.info("Working memory is not available, retry after 500 ms");
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          throw new MemoryException(e);
-        }
-      } else {
-        break;
-      }
-      tries++;
-    }
-    if (baseBlock == null) {
-      INSTANCE.printCurrentMemoryUsage();
-      throw new MemoryException("Not enough working memory, please increase "
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
-    }
-    return baseBlock;
+      long size) {
+    return INSTANCE.allocateMemory(memoryType, taskId, size);
   }
 
   private MemoryAllocator getMemoryAllocator(MemoryType memoryType) {
@@ -231,9 +210,4 @@ public class UnsafeMemoryManager {
   public static boolean isOffHeap() {
     return offHeap;
   }
-
-  private synchronized void printCurrentMemoryUsage() {
-    LOGGER.info(String.format("Working memory used %d, running tasks are %s",
-        memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
-  }
 }