You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/10/04 09:05:25 UTC

carbondata git commit: [CARBONDATA-2990] Fixed JVM crash when rebuilding bloom datamap

Repository: carbondata
Updated Branches:
  refs/heads/master c3a870449 -> 8fbd4a5f5


[CARBONDATA-2990] Fixed JVM crash when rebuilding bloom datamap

Problem: while rebuilding the datamap it access the datamap store so it builds datamap and store in unsafe onheap storage. But while closing the
reader it frees all memory acquired during that task. Since acquired memory is onheap but releasing the memory with offheap allocator it crashes the jvm.

Solution: Maintain the type of memory acquired in the memory block itself and get the allocator as per the memory type and release it.

This closes #2793


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fbd4a5f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fbd4a5f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fbd4a5f

Branch: refs/heads/master
Commit: 8fbd4a5f53070b3755f1f573b09e0066fa93a6ea
Parents: c3a8704
Author: ravipesala <ra...@gmail.com>
Authored: Sun Sep 30 11:27:57 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Oct 4 14:39:25 2018 +0530

----------------------------------------------------------------------
 .../core/indexstore/UnsafeMemoryDMStore.java    | 14 +++----
 .../core/memory/HeapMemoryAllocator.java        |  5 ++-
 .../carbondata/core/memory/MemoryBlock.java     | 14 ++++++-
 .../carbondata/core/memory/MemoryType.java      | 23 ++++++++++
 .../core/memory/UnsafeMemoryAllocator.java      |  2 +-
 .../core/memory/UnsafeMemoryManager.java        | 44 +++++++++++---------
 6 files changed, 70 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 196559a..0db1b0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,9 @@ package org.apache.carbondata.core.indexstore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.MemoryType;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -51,7 +51,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
   public UnsafeMemoryDMStore() throws MemoryException {
     this.allocatedSize = capacity;
     this.memoryBlock =
-        UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize);
+        UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize);
     this.pointers = new int[1000];
   }
 
@@ -74,10 +74,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
   private void increaseMemory(int requiredMemory) throws MemoryException {
     MemoryBlock newMemoryBlock = UnsafeMemoryManager
-        .allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize + requiredMemory);
+        .allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize + requiredMemory);
     getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(),
         newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength);
-    UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, this.memoryBlock);
+    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock);
     allocatedSize = allocatedSize + requiredMemory;
     this.memoryBlock = newMemoryBlock;
   }
@@ -190,10 +190,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
   public void finishWriting() throws MemoryException {
     if (runningLength < allocatedSize) {
       MemoryBlock allocate =
-          UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, runningLength);
+          UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, runningLength);
       getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       memoryBlock = allocate;
     }
     // Compact pointers.
@@ -206,7 +206,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
   public void freeMemory() {
     if (!isMemoryFreed) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       isMemoryFreed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index d08f803..58162da 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -71,7 +71,8 @@ public class HeapMemoryAllocator implements MemoryAllocator {
             final long[] array = arrayReference.get();
             if (array != null) {
               assert (array.length * 8L >= size);
-              MemoryBlock memory = new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
+              MemoryBlock memory =
+                  new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP);
               // reuse this MemoryBlock
               memory.setFreedStatus(false);
               return memory;
@@ -82,7 +83,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
       }
     }
     long[] array = new long[numWords];
-    return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
+    return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP);
   }
 
   @Override public void free(MemoryBlock memory) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
index 418ef89..87ae982 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
@@ -31,12 +31,18 @@ public class MemoryBlock extends MemoryLocation {
   /**
    * whether freed or not
    */
-  private boolean isFreed = false;
+  private boolean isFreed;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
+  /**
+   * Whether it is offheap or onheap memory type
+   */
+  private MemoryType memoryType;
+
+  public MemoryBlock(@Nullable Object obj, long offset, long length, MemoryType memoryType) {
     super(obj, offset);
     this.length = length;
     this.isFreed = false;
+    this.memoryType = memoryType;
   }
 
   /**
@@ -53,4 +59,8 @@ public class MemoryBlock extends MemoryLocation {
   public void setFreedStatus(boolean freedStatus) {
     this.isFreed = freedStatus;
   }
+
+  public MemoryType getMemoryType() {
+    return memoryType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java
new file mode 100644
index 0000000..63e20d6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+public enum MemoryType {
+
+  OFFHEAP, ONHEAP;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
index 67412ac..e596895 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
@@ -28,7 +28,7 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
     long address = CarbonUnsafe.getUnsafe().allocateMemory(size);
     // initializing memory with zero
     CarbonUnsafe.getUnsafe().setMemory(null, address, size, (byte) 0);
-    return new MemoryBlock(null, address, size);
+    return new MemoryBlock(null, address, size, MemoryType.OFFHEAP);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 703d57a..4efea1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -68,9 +68,9 @@ public class UnsafeMemoryManager {
       LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
     }
     long takenSize = size;
-    MemoryAllocator allocator;
+    MemoryType memoryType;
     if (offHeap) {
-      allocator = MemoryAllocator.UNSAFE;
+      memoryType = MemoryType.OFFHEAP;
       long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
       if (takenSize < defaultSize) {
         takenSize = defaultSize;
@@ -86,9 +86,9 @@ public class UnsafeMemoryManager {
           takenSize = maxMemory;
         }
       }
-      allocator = MemoryAllocator.HEAP;
+      memoryType = MemoryType.ONHEAP;
     }
-    INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+    INSTANCE = new UnsafeMemoryManager(takenSize, memoryType);
     taskIdToMemoryBlockMap = new HashMap<>();
   }
 
@@ -98,19 +98,19 @@ public class UnsafeMemoryManager {
 
   private long memoryUsed;
 
-  private MemoryAllocator allocator;
+  private MemoryType memoryType;
 
-  private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
+  private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) {
     this.totalMemory = totalMemory;
-    this.allocator = allocator;
+    this.memoryType = memoryType;
     LOGGER
-        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
+        .info("Working Memory manager is created with size " + totalMemory + " with " + memoryType);
   }
 
-  private synchronized MemoryBlock allocateMemory(MemoryAllocator memoryAllocator, long taskId,
+  private synchronized MemoryBlock allocateMemory(MemoryType memoryType, long taskId,
       long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock allocate = memoryAllocator.allocate(memoryRequested);
+      MemoryBlock allocate = getMemoryAllocator(memoryType).allocate(memoryRequested);
       memoryUsed += allocate.size();
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
@@ -129,16 +129,11 @@ public class UnsafeMemoryManager {
   }
 
   public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
-    freeMemory(allocator, taskId, memoryBlock);
-  }
-
-  public synchronized void freeMemory(MemoryAllocator memoryAllocator, long taskId,
-      MemoryBlock memoryBlock) {
     if (taskIdToMemoryBlockMap.containsKey(taskId)) {
       taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
     }
     if (!memoryBlock.isFreedStatus()) {
-      memoryAllocator.free(memoryBlock);
+      getMemoryAllocator(memoryBlock.getMemoryType()).free(memoryBlock);
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
@@ -160,7 +155,7 @@ public class UnsafeMemoryManager {
         memoryBlock = iterator.next();
         if (!memoryBlock.isFreedStatus()) {
           occuppiedMemory += memoryBlock.size();
-          allocator.free(memoryBlock);
+          getMemoryAllocator(memoryBlock.getMemoryType()).free(memoryBlock);
         }
       }
     }
@@ -188,15 +183,15 @@ public class UnsafeMemoryManager {
    */
   public static MemoryBlock allocateMemoryWithRetry(long taskId, long size)
       throws MemoryException {
-    return allocateMemoryWithRetry(INSTANCE.allocator, taskId, size);
+    return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size);
   }
 
-  public static MemoryBlock allocateMemoryWithRetry(MemoryAllocator memoryAllocator, long taskId,
+  public static MemoryBlock allocateMemoryWithRetry(MemoryType memoryType, long taskId,
       long size) throws MemoryException {
     MemoryBlock baseBlock = null;
     int tries = 0;
     while (tries < 300) {
-      baseBlock = INSTANCE.allocateMemory(memoryAllocator, taskId, size);
+      baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size);
       if (baseBlock == null) {
         try {
           LOGGER.info("Memory is not available, retry after 500 millis");
@@ -217,6 +212,15 @@ public class UnsafeMemoryManager {
     return baseBlock;
   }
 
+  private MemoryAllocator getMemoryAllocator(MemoryType memoryType) {
+    switch (memoryType) {
+      case ONHEAP:
+        return MemoryAllocator.HEAP;
+      default:
+        return MemoryAllocator.UNSAFE;
+    }
+  }
+
   public static boolean isOffHeap() {
     return offHeap;
   }