You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:13 UTC

[11/45] carbondata git commit: [HOTFIX] Fix NPE in LRU cache when entry from the same table is getting evicted to load another entry from same table

[HOTFIX] Fix NPE in LRU cache when entry from the same table is getting evicted to load another entry from same table

Problem
When driver LRU cache size is configured to a small value then on running concurrent queries sometimes while loading the block dataMap in LRU cache one of the dataMap entries from the same table is getting deleted because of shortage of space. Due to this in the flow after loading the dataMap cache NPE is thrown.
This is because when an cacheable entry is removed from LRU cache then invalidate is called on that cacheable entry to clear the unsafe memory used by that entry. Invalidate method makes the references null and clears the unsafe memory which leads to NPE when accessed again.

Solution
Currently dataMap cache uses unsafe offheap memory for datamap caching. To avoid this the code is modified to use unsafe with onheap so that JVM itself takes care of clearing the memory when required. We do not require to explicitly set the references to null.

This closes #2759


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a4f5300
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a4f5300
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a4f5300

Branch: refs/heads/branch-1.5
Commit: 2a4f53001058346843e0248c60fee2943087efc9
Parents: 5c0da31
Author: manishgupta88 <to...@gmail.com>
Authored: Tue Sep 25 19:21:08 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Sep 27 12:08:28 2018 +0530

----------------------------------------------------------------------
 .../indexstore/BlockletDataMapIndexWrapper.java  |  1 -
 .../core/indexstore/SafeMemoryDMStore.java       |  1 -
 .../core/indexstore/UnsafeMemoryDMStore.java     | 16 +++++++++-------
 .../indexstore/blockletindex/BlockDataMap.java   |  2 --
 .../core/memory/HeapMemoryAllocator.java         |  8 ++++++--
 .../core/memory/UnsafeMemoryManager.java         | 19 +++++++++++++++----
 .../util/AbstractDataFileFooterConverter.java    |  4 ++--
 7 files changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index 7b8a13b..33d69aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -70,7 +70,6 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
     for (DataMap dataMap : dataMaps) {
       dataMap.clear();
     }
-    dataMaps = null;
   }
 
   public List<BlockDataMap> getDataMaps() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
index 0b3d4d8..042790f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -62,7 +62,6 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore {
     if (!isMemoryFreed) {
       if (null != dataMapRows) {
         dataMapRows.clear();
-        dataMapRows = null;
       }
       isMemoryFreed = true;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 3e8ce12..196559a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
@@ -49,7 +50,8 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
   public UnsafeMemoryDMStore() throws MemoryException {
     this.allocatedSize = capacity;
-    this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
+    this.memoryBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize);
     this.pointers = new int[1000];
   }
 
@@ -71,11 +73,11 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
   }
 
   private void increaseMemory(int requiredMemory) throws MemoryException {
-    MemoryBlock newMemoryBlock =
-        UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + requiredMemory);
+    MemoryBlock newMemoryBlock = UnsafeMemoryManager
+        .allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize + requiredMemory);
     getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(),
         newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength);
-    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock);
+    UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, this.memoryBlock);
     allocatedSize = allocatedSize + requiredMemory;
     this.memoryBlock = newMemoryBlock;
   }
@@ -188,10 +190,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
   public void finishWriting() throws MemoryException {
     if (runningLength < allocatedSize) {
       MemoryBlock allocate =
-          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength);
+          UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, runningLength);
       getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
       memoryBlock = allocate;
     }
     // Compact pointers.
@@ -204,7 +206,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
   public void freeMemory() {
     if (!isMemoryFreed) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
       isMemoryFreed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 0cf9914..d7b7977 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -943,12 +943,10 @@ public class BlockDataMap extends CoarseGrainDataMap
   @Override public void clear() {
     if (memoryDMStore != null) {
       memoryDMStore.freeMemory();
-      memoryDMStore = null;
     }
     // clear task min/max unsafe memory
     if (null != taskSummaryDMStore) {
       taskSummaryDMStore.freeMemory();
-      taskSummaryDMStore = null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index d898dc9..d08f803 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.concurrent.GuardedBy;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -39,8 +40,11 @@ public class HeapMemoryAllocator implements MemoryAllocator {
 
   public HeapMemoryAllocator() {
     poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
-    // if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism.
-    if (poolingThresholdBytes == -1) {
+    boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
+    // if set 'poolingThresholdBytes' to -1 or the object creation call is in driver,
+    // it should not go through the pooling mechanism.
+    if (poolingThresholdBytes == -1 || isDriver) {
       shouldPooling = false;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index e3593c5..703d57a 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -107,9 +107,10 @@ public class UnsafeMemoryManager {
         .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
   }
 
-  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
+  private synchronized MemoryBlock allocateMemory(MemoryAllocator memoryAllocator, long taskId,
+      long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock allocate = allocator.allocate(memoryRequested);
+      MemoryBlock allocate = memoryAllocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
@@ -128,11 +129,16 @@ public class UnsafeMemoryManager {
   }
 
   public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
+    freeMemory(allocator, taskId, memoryBlock);
+  }
+
+  public synchronized void freeMemory(MemoryAllocator memoryAllocator, long taskId,
+      MemoryBlock memoryBlock) {
     if (taskIdToMemoryBlockMap.containsKey(taskId)) {
       taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
     }
     if (!memoryBlock.isFreedStatus()) {
-      allocator.free(memoryBlock);
+      memoryAllocator.free(memoryBlock);
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
@@ -182,10 +188,15 @@ public class UnsafeMemoryManager {
    */
   public static MemoryBlock allocateMemoryWithRetry(long taskId, long size)
       throws MemoryException {
+    return allocateMemoryWithRetry(INSTANCE.allocator, taskId, size);
+  }
+
+  public static MemoryBlock allocateMemoryWithRetry(MemoryAllocator memoryAllocator, long taskId,
+      long size) throws MemoryException {
     MemoryBlock baseBlock = null;
     int tries = 0;
     while (tries < 300) {
-      baseBlock = INSTANCE.allocateMemory(taskId, size);
+      baseBlock = INSTANCE.allocateMemory(memoryAllocator, taskId, size);
       if (baseBlock == null) {
         try {
           LOGGER.info("Memory is not available, retry after 500 millis");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index b1dd580..601ce50 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -295,8 +295,8 @@ public abstract class AbstractDataFileFooterConverter {
       blockletMinMaxFlag = blockletIndexList.get(i).getMinMaxIndex().getIsMinMaxSet();
       for (int j = 0; j < maxValue.length; j++) {
         // can be null for stores < 1.5.0 version
-        if (null != blockletMinMaxFlag && !blockletMinMaxFlag[i]) {
-          blockMinMaxFlag[i] = blockletMinMaxFlag[i];
+        if (null != blockletMinMaxFlag && !blockletMinMaxFlag[j]) {
+          blockMinMaxFlag[j] = blockletMinMaxFlag[j];
           currentMaxValue[j] = new byte[0];
           currentMinValue[j] = new byte[0];
           continue;