You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:14 UTC

[carbondata] 10/27: [CARBONDATA-3267]Fixed Range Sort OOM Issue

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 1bdfed088e3480d23fb416b14ba1a442ff43ae9b
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Tue Jan 22 22:37:26 2019 +0530

    [CARBONDATA-3267]Fixed Range Sort OOM Issue
    
    Problem:
    Range sort is failing with OOM.
    
    Root cause:
    This is because UnsafeSortStorageMemory is not able to control the off heap memory because of this when huge data is loaded it OOM exception is coming fron UnsafeMemoryAllocator.allocate.
    
    Solution:
    Added code code to control Sort Storage memory. After sorting the rows if memory is available then only add sorted records to sort storage memory otherwise write to disk
    
    This closes #3096
---
 .../carbondata/core/memory/IntPointerBuffer.java   | 10 +--
 .../core/memory/UnsafeSortMemoryManager.java       | 77 +++-------------------
 .../loading/sort/unsafe/UnsafeCarbonRowPage.java   |  9 +--
 .../loading/sort/unsafe/UnsafeSortDataRows.java    | 28 ++++----
 .../unsafe/merger/UnsafeIntermediateMerger.java    |  4 +-
 .../processing/sort/sortdata/SortParameters.java   |  1 -
 6 files changed, 29 insertions(+), 100 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index c596b08..1f1c865 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -75,17 +75,17 @@ public class IntPointerBuffer {
   }
 
   public void loadToUnsafe() {
-    try {
-      pointerMemoryBlock =
-          UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4);
+    pointerMemoryBlock =
+        UnsafeSortMemoryManager.INSTANCE.allocateMemory(this.taskId, pointerBlock.length * 4);
+    // pointerMemoryBlock it means sort storage memory manager does not have space to loaf pointer
+    // buffer in that case use pointerBlock
+    if (null != pointerMemoryBlock) {
       for (int i = 0; i < pointerBlock.length; i++) {
         CarbonUnsafe.getUnsafe()
             .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
                 pointerBlock[i]);
       }
       pointerBlock = null;
-    } catch (MemoryException e) {
-      LOGGER.warn("Not enough memory for allocating pointer buffer, sorting in heap");
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
index 847f6e2..3c12733 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -112,17 +112,6 @@ public class UnsafeSortMemoryManager {
   }
 
   /**
-   * Below method will be used to check whether memory required is
-   * available or not
-   *
-   * @param required
-   * @return if memory available
-   */
-  public synchronized boolean isMemoryAvailable(long required) {
-    return memoryUsed + required < totalMemory;
-  }
-
-  /**
    * total usable memory for sort memory manager
    * @return size in bytes
    */
@@ -130,21 +119,6 @@ public class UnsafeSortMemoryManager {
     return totalMemory;
   }
 
-  /**
-   * Below method will be used to allocate dummy memory
-   * this will be used to allocate first and then used when u need
-   *
-   * @param size
-   */
-  public synchronized void allocateDummyMemory(long size) {
-    memoryUsed += size;
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(String.format(
-          "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes",
-          size, memoryUsed, totalMemory - memoryUsed));
-    }
-  }
-
   public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) {
     if (taskIdToMemoryBlockMap.containsKey(taskId)) {
       taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
@@ -195,52 +169,17 @@ public class UnsafeSortMemoryManager {
   }
 
   /**
-   * Before calling this method caller should call allocateMemoryDummy
-   * This method will be used to allocate the memory, this can be used
-   * when caller wants to allocate memory first and used it anytime
-   * @param taskId
-   * @param memoryRequested
-   * @return memory block
-   */
-  public synchronized MemoryBlock allocateMemoryLazy(String taskId, long memoryRequested) {
-    MemoryBlock allocate = allocator.allocate(memoryRequested);
-    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
-    if (null == listOfMemoryBlock) {
-      listOfMemoryBlock = new HashSet<>();
-      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
-    }
-    listOfMemoryBlock.add(allocate);
-    return allocate;
-  }
-
-  /**
-   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+   * Below method will be used to check whether memory required is
+   * available or not
+   *
+   * @param required
+   * @return if memory available
    */
-  public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
-          throws MemoryException {
-    MemoryBlock baseBlock = null;
-    int tries = 0;
-    while (tries < 100) {
-      baseBlock = INSTANCE.allocateMemory(taskId, size);
-      if (baseBlock == null) {
-        try {
-          Thread.sleep(50);
-        } catch (InterruptedException e) {
-          throw new MemoryException(e);
-        }
-      } else {
-        break;
-      }
-      tries++;
-    }
-    if (baseBlock == null) {
-      throw new MemoryException("Not enough sort memory, please increase "
-          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB);
-    }
-    return baseBlock;
+  public synchronized boolean isMemoryAvailable(long required) {
+    return memoryUsed + required < totalMemory;
   }
 
-  private synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) {
+  public synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index a480cf7..21403b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -42,8 +42,6 @@ public class UnsafeCarbonRowPage {
 
   private MemoryBlock dataBlock;
 
-  private boolean saveToDisk;
-
   private MemoryManagerType managerType;
 
   private String taskId;
@@ -53,10 +51,9 @@ public class UnsafeCarbonRowPage {
   private boolean convertNoSortFields;
 
   public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
-      boolean saveToDisk, String taskId) {
+      String taskId) {
     this.tableFieldStat = tableFieldStat;
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
-    this.saveToDisk = saveToDisk;
     this.taskId = taskId;
     buffer = new IntPointerBuffer(this.taskId);
     this.dataBlock = memoryBlock;
@@ -126,10 +123,6 @@ public class UnsafeCarbonRowPage {
     }
   }
 
-  public boolean isSaveToDisk() {
-    return saveToDisk;
-  }
-
   public IntPointerBuffer getBuffer() {
     return buffer;
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 24faa51..e8e1c08 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -148,13 +148,10 @@ public class UnsafeSortDataRows {
         UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
     boolean isMemoryAvailable =
         UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
-    if (isMemoryAvailable) {
-      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
-    } else {
-      // merge and spill in-memory pages to disk if memory is not enough
-      unsafeInMemoryIntermediateFileMerger.tryTriggerInmemoryMerging(true);
+    if (!isMemoryAvailable) {
+      unsafeInMemoryIntermediateFileMerger.tryTriggerInMemoryMerging(true);
     }
-    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, taskId);
   }
 
   public boolean canAdd() {
@@ -382,7 +379,12 @@ public class UnsafeSortDataRows {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparatorForNormalDims(page));
         }
-        if (page.isSaveToDisk()) {
+        // get sort storage memory block if memory is available in sort storage manager
+        // if space is available then store it in memory, if memory is not available
+        // then spill to disk
+        MemoryBlock sortStorageMemoryBlock =
+            UnsafeSortMemoryManager.INSTANCE.allocateMemory(taskId, page.getDataBlock().size());
+        if (null == sortStorageMemoryBlock) {
           // create a new file every time
           // create a new file and pick a temp directory randomly every time
           String tmpDir = parameters.getTempFileLocation()[
@@ -400,18 +402,14 @@ public class UnsafeSortDataRows {
           // intermediate merging of sort temp files will be triggered
           unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
         } else {
-          // creating a new memory block as size is already allocated
-          // so calling lazy memory allocator
-          MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
-              .allocateMemoryLazy(taskId, page.getDataBlock().size());
-          // copying data from working memory manager to sortmemory manager
+          // copying data from working memory manager block to storage memory manager block
           CarbonUnsafe.getUnsafe()
               .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
-                  newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
-                  page.getDataBlock().size());
+                  sortStorageMemoryBlock.getBaseObject(),
+                  sortStorageMemoryBlock.getBaseOffset(), page.getDataBlock().size());
           // free unsafememory manager
           page.freeMemory();
-          page.setNewDataBlock(newMemoryBlock);
+          page.setNewDataBlock(sortStorageMemoryBlock);
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered
           page.getBuffer().loadToUnsafe();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index f972f0c..1389ff7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -140,7 +140,7 @@ public class UnsafeIntermediateMerger {
     mergerTask.add(executorService.submit(merger));
   }
 
-  public void tryTriggerInmemoryMerging(boolean spillDisk)
+  public void tryTriggerInMemoryMerging(boolean spillDisk)
       throws CarbonSortKeyAndGroupByException {
     List<UnsafeCarbonRowPage> pages2Merge = new ArrayList<>();
     int totalRows2Merge = 0;
@@ -170,7 +170,7 @@ public class UnsafeIntermediateMerger {
 
   public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
     if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      tryTriggerInmemoryMerging(false);
+      tryTriggerInMemoryMerging(false);
     }
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 09dd52f..6fec8dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -441,7 +441,6 @@ public class SortParameters implements Serializable {
 
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
     int numberOfCores = 1;
     // In case of loading from partition we should use the cores specified by it
     if (configuration.getWritingCoresCount() > 0) {