You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/10 11:51:39 UTC

carbondata git commit: [HOTFIX] Fixed LRU cache bug to invalidate the cacheable object to clean up the resources

Repository: carbondata
Updated Branches:
  refs/heads/master 0528a7985 -> 54dcd8d5b


[HOTFIX] Fixed LRU cache bug to invalidate the cacheable object to clean up the resources

This PR contains

Fix for LRU cache bug to invalidate the Cacheable object while removing it from LRU cache. This will help in clearing the unsafe memory for cacheable objects like BlockDataMaps
Fix for setting the driver flag for saprkCarbonFileFormat which will used for taking the driver memory for LRU cache and unsafe memory initialization
Modified the logic for properties validation for unsafe working and sort memory. Sort memory now will not consider the value of parameter sort.inmemory.size.inmb as it deprecated from long back. The memory configured for this parameter was divided in 80:20 ratio for sort and working unsafe memory which is now removed. Now only value for parameter carbon.sort.storage.inmemory.size.inmb will be considered.

This closes #2698


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/54dcd8d5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/54dcd8d5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/54dcd8d5

Branch: refs/heads/master
Commit: 54dcd8d5b1f6205523363454e8d197bb95652775
Parents: 0528a79
Author: manishgupta88 <to...@gmail.com>
Authored: Thu Sep 6 22:56:49 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Sep 10 17:21:26 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/cache/Cacheable.java |  7 +++
 .../carbondata/core/cache/CarbonLRUCache.java   |  8 +--
 .../AbstractColumnDictionaryInfo.java           |  4 ++
 .../core/datastore/block/AbstractIndex.java     |  4 ++
 .../indexstore/BlockletDataMapIndexWrapper.java |  8 +++
 .../core/memory/UnsafeMemoryManager.java        | 40 ++++++------
 .../carbondata/core/util/CarbonProperties.java  | 66 +++-----------------
 .../datamap/bloom/BloomCacheKeyValue.java       |  4 ++
 .../execution/datasources/CarbonFileIndex.scala |  6 ++
 9 files changed, 69 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
index d8be12b..c58982d 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java
@@ -45,4 +45,11 @@ public interface Cacheable {
    * @return
    */
   long getMemorySize();
+
+  /**
+   * Method to be used for invalidating the cacheable object. API to be invoked at the time of
+   * removing the cacheable object from memory. Example at the of removing the cachebale object
+   * from LRU cache
+   */
+  void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 03838a2..4a0c36c 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -155,10 +155,10 @@ public final class CarbonLRUCache {
   private void removeKey(String key) {
     Cacheable cacheable = lruCacheMap.get(key);
     if (null != cacheable) {
-      currentSize = currentSize - cacheable.getMemorySize();
-    }
-    Cacheable remove = lruCacheMap.remove(key);
-    if (null != remove) {
+      long memorySize = cacheable.getMemorySize();
+      cacheable.invalidate();
+      lruCacheMap.remove(key);
+      currentSize = currentSize - memorySize;
       LOGGER.info("Removed entry from InMemory lru cache :: " + key);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
index c138cc8..f5971a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
@@ -296,5 +296,9 @@ public abstract class AbstractColumnDictionaryInfo implements DictionaryInfo {
     byte[] keyData = value.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
     return getSurrogateKey(keyData);
   }
+
+  @Override public void invalidate() {
+
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
index 7fbef8a..1972e97 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -100,6 +100,10 @@ public abstract class AbstractIndex implements Cacheable {
     return this.memorySize;
   }
 
+  @Override public void invalidate() {
+
+  }
+
   /**
    * The method is used to set the access count
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index b0fb13e..2cf0259 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 
 /**
@@ -57,6 +58,13 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
     return wrapperSize;
   }
 
+  @Override public void invalidate() {
+    for (DataMap dataMap : dataMaps) {
+      dataMap.clear();
+    }
+    dataMaps = null;
+  }
+
   public List<BlockDataMap> getDataMaps() {
     return dataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 9133f0f..8fcbb6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -42,6 +42,7 @@ public class UnsafeMemoryManager {
   private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
   static {
     long size = 0L;
+    String defaultWorkingMemorySize = null;
     try {
       // check if driver unsafe memory is configured and JVM process is in driver. In that case
       // initialize unsafe memory configured for driver
@@ -49,38 +50,41 @@ public class UnsafeMemoryManager {
           .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
       boolean initializedWithUnsafeDriverMemory = false;
       if (isDriver) {
-        String driverUnsafeMemorySize = CarbonProperties.getInstance()
+        defaultWorkingMemorySize = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB);
-        if (null != driverUnsafeMemorySize) {
-          size = Long.parseLong(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB,
-                  CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
+        if (null != defaultWorkingMemorySize) {
+          size = Long.parseLong(defaultWorkingMemorySize);
           initializedWithUnsafeDriverMemory = true;
         }
       }
       if (!initializedWithUnsafeDriverMemory) {
-        size = Long.parseLong(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
-                CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
+        defaultWorkingMemorySize = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
+        if (null != defaultWorkingMemorySize) {
+          size = Long.parseLong(defaultWorkingMemorySize);
+        }
       }
     } catch (Exception e) {
-      size = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      LOGGER.info("Wrong memory size given, "
-          + "so setting default value to " + size);
-    }
-    if (size < 512) {
-      size = 512;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, "
-          + "so setting default value to " + size);
+      LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
     }
-    long takenSize = size * 1024 * 1024;
+    long takenSize = size;
     MemoryAllocator allocator;
     if (offHeap) {
       allocator = MemoryAllocator.UNSAFE;
+      long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
+      if (takenSize < defaultSize) {
+        takenSize = defaultSize;
+      }
+      takenSize = takenSize * 1024 * 1024;
     } else {
       long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
-      if (takenSize > maxMemory) {
+      if (takenSize == 0L) {
         takenSize = maxMemory;
+      } else {
+        takenSize = takenSize * 1024 * 1024;
+        if (takenSize > maxMemory) {
+          takenSize = maxMemory;
+        }
       }
       allocator = MemoryAllocator.HEAP;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6305283..559320a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1294,73 +1294,27 @@ public final class CarbonProperties {
   }
 
   private void validateSortMemorySizeInMB() {
-    int sortMemorySizeInMBDefault =
-        Integer.parseInt(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-    int sortMemorySizeInMB = 0;
     try {
-      sortMemorySizeInMB = Integer.parseInt(
-          carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
-    } catch (NumberFormatException e) {
-      LOGGER.warn(
-          "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
-              + "is Invalid." + " Taking the default value."
-              + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-      sortMemorySizeInMB = sortMemorySizeInMBDefault;
-    }
-    if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
-      LOGGER.warn(
-          "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
-              + "is less than default value." + ". Taking the default value."
-              + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-      sortMemorySizeInMB = sortMemorySizeInMBDefault;
-    }
-    String unsafeWorkingMemoryString =
-        carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
-    String unsafeSortStorageMemoryString =
-        carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB);
-    int workingMemory = 512;
-    int sortStorageMemory;
-    if (null == unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) {
-      workingMemory = workingMemory > ((sortMemorySizeInMB * 20) / 100) ?
-          workingMemory :
-          ((sortMemorySizeInMB * 20) / 100);
-      sortStorageMemory = sortMemorySizeInMB - workingMemory;
-      carbonProperties
-          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, workingMemory + "");
-      carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          sortStorageMemory + "");
-    } else if (null != unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) {
+      int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
       carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          sortMemorySizeInMB + "");
-    } else if (null == unsafeWorkingMemoryString && null != unsafeSortStorageMemoryString) {
-      carbonProperties
-          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, sortMemorySizeInMB + "");
+          unsafeSortStorageMemoryString + "");
+    } catch (NumberFormatException ne) {
+      LOGGER.warn("The specified value for property "
+          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid.");
     }
   }
 
   private void validateWorkingMemory() {
-    int unsafeWorkingMemoryDefault =
-        Integer.parseInt(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-    int unsafeWorkingMemory = 0;
     try {
-      unsafeWorkingMemory = Integer.parseInt(
+      int unsafeWorkingMemory = Integer.parseInt(
           carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
+      carbonProperties
+          .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + "");
     } catch (NumberFormatException e) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."
-          + " Taking the default value."
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      unsafeWorkingMemory = unsafeWorkingMemoryDefault;
-    }
-    if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
-      LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
-          + "is less than the default value." + ". Taking the default value."
-          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
-      unsafeWorkingMemory = unsafeWorkingMemoryDefault;
+          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid.");
     }
-    carbonProperties
-        .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + "");
   }
 
   private void validateSortStorageMemory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
index 29e94d8..a66ee63 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -100,6 +100,10 @@ public class BloomCacheKeyValue {
       return size;
     }
 
+    @Override public void invalidate() {
+      bloomFilters = null;
+    }
+
     public List<CarbonBloomFilter> getBloomFilters() {
       return bloomFilters;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54dcd8d5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index d970892..af05613 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
@@ -79,6 +81,10 @@ class CarbonFileIndex(
 
   private def prune(dataFilters: Seq[Expression],
       directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
+    // set the driver flag to true which will used for unsafe memory initialization and carbon LRU
+    // cache instance initialization as per teh driver memory
+    CarbonProperties.getInstance
+      .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
     val tablePath = parameters.get("path")
     if (tablePath.nonEmpty && dataFilters.nonEmpty) {
       val hadoopConf = sparkSession.sessionState.newHadoopConf()