You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/24 08:12:16 UTC

carbondata git commit: [CARBONDATA-3035] Optimize parameters for unsafe working and sort memory

Repository: carbondata
Updated Branches:
  refs/heads/master b21a6d49f -> c429cee16


[CARBONDATA-3035] Optimize parameters for unsafe working and sort memory

Add memory type in log message to distinguish them and fix trival bugs
for them

This closes #2844


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c429cee1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c429cee1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c429cee1

Branch: refs/heads/master
Commit: c429cee16b5b7e78e920608f5a8638cd01b91a58
Parents: b21a6d4
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Oct 23 11:33:33 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 24 16:11:41 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 +-
 .../core/memory/UnsafeMemoryManager.java        | 66 +++++++++++---------
 .../core/memory/UnsafeSortMemoryManager.java    | 49 ++++++++-------
 .../carbondata/core/util/CarbonProperties.java  | 34 +++-------
 4 files changed, 75 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b5e1e5d..c5f8335 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1628,9 +1628,9 @@ public final class CarbonCommonConstants {
   public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "false";
 
   @CarbonProperty
-  public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB =
+  public static final String CARBON_SORT_STORAGE_INMEMORY_IN_MB =
       "carbon.sort.storage.inmemory.size.inmb";
-  public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT = "512";
+  public static final int CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT = 512;
 
   @CarbonProperty
   public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 6a69dfd..db0258f 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -43,7 +44,7 @@ public class UnsafeMemoryManager {
   private static Map<String,Set<MemoryBlock>> taskIdToMemoryBlockMap;
   static {
     long size = 0L;
-    String defaultWorkingMemorySize = null;
+    String configuredWorkingMemorySize = null;
     try {
       // check if driver unsafe memory is configured and JVM process is in driver. In that case
       // initialize unsafe memory configured for driver
@@ -51,22 +52,22 @@ public class UnsafeMemoryManager {
           .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
       boolean initializedWithUnsafeDriverMemory = false;
       if (isDriver) {
-        defaultWorkingMemorySize = CarbonProperties.getInstance()
+        configuredWorkingMemorySize = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB);
-        if (null != defaultWorkingMemorySize) {
-          size = Long.parseLong(defaultWorkingMemorySize);
+        if (null != configuredWorkingMemorySize) {
+          size = Long.parseLong(configuredWorkingMemorySize);
           initializedWithUnsafeDriverMemory = true;
         }
       }
       if (!initializedWithUnsafeDriverMemory) {
-        defaultWorkingMemorySize = CarbonProperties.getInstance()
+        configuredWorkingMemorySize = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
-        if (null != defaultWorkingMemorySize) {
-          size = Long.parseLong(defaultWorkingMemorySize);
+        if (null != configuredWorkingMemorySize) {
+          size = Long.parseLong(configuredWorkingMemorySize);
         }
       }
     } catch (Exception e) {
-      LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
+      LOGGER.info("Invalid working memory size value: " + configuredWorkingMemorySize);
     }
     long takenSize = size;
     MemoryType memoryType;
@@ -75,6 +76,10 @@ public class UnsafeMemoryManager {
       long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
       if (takenSize < defaultSize) {
         takenSize = defaultSize;
+        LOGGER.warn(String.format(
+            "It is not recommended to set unsafe working memory size less than %sMB,"
+                + " so setting default value to %d",
+            CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT, defaultSize));
       }
       takenSize = takenSize * 1024 * 1024;
     } else {
@@ -104,27 +109,27 @@ public class UnsafeMemoryManager {
   private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) {
     this.totalMemory = totalMemory;
     this.memoryType = memoryType;
-    LOGGER
-        .info("Working Memory manager is created with size " + totalMemory + " with " + memoryType);
+    LOGGER.info(
+        "Working Memory manager is created with size " + totalMemory + " with " + memoryType);
   }
 
   private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String taskId,
       long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock allocate = getMemoryAllocator(memoryType).allocate(memoryRequested);
-      memoryUsed += allocate.size();
+      MemoryBlock memoryBlock = getMemoryAllocator(memoryType).allocate(memoryRequested);
+      memoryUsed += memoryBlock.size();
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
         listOfMemoryBlock = new HashSet<>();
         taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
-      listOfMemoryBlock.add(allocate);
+      listOfMemoryBlock.add(memoryBlock);
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Memory block (" + allocate + ") is created with size " + allocate.size()
-            + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
-            + "Bytes");
+        LOGGER.debug(String.format("Creating working Memory block (%s) with size %d."
+                + " Total memory used %d Bytes, left %d Bytes.",
+            memoryBlock.toString(), memoryBlock.size(), memoryUsed, totalMemory - memoryUsed));
       }
-      return allocate;
+      return memoryBlock;
     }
     return null;
   }
@@ -138,9 +143,9 @@ public class UnsafeMemoryManager {
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Freeing memory of size: " + memoryBlock.size() + "available memory:  " + (totalMemory
-                - memoryUsed));
+        LOGGER.debug(String.format(
+            "Freeing working memory block (%s) with size: %d, current available memory is: %d",
+            memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed));
       }
     }
   }
@@ -163,12 +168,13 @@ public class UnsafeMemoryManager {
     memoryUsed -= occuppiedMemory;
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
-              totalMemory - memoryUsed));
+      LOGGER.debug(String.format(
+          "Freeing working memory of size %d. Current available memory is %d",
+          occuppiedMemory, totalMemory - memoryUsed));
     }
-    LOGGER.info("Total memory used after task " + taskId + " is " + memoryUsed
-        + " Current tasks running now are : " + taskIdToMemoryBlockMap.keySet());
+    LOGGER.info(String.format(
+        "Total working memory used after task %s is %d. Current running tasks are %s",
+        taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
   }
 
   public synchronized boolean isMemoryAvailable() {
@@ -195,7 +201,7 @@ public class UnsafeMemoryManager {
       baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size);
       if (baseBlock == null) {
         try {
-          LOGGER.info("Memory is not available, retry after 500 millis");
+          LOGGER.info("Working memory is not available, retry after 500 ms");
           Thread.sleep(500);
         } catch (InterruptedException e) {
           throw new MemoryException(e);
@@ -207,8 +213,8 @@ public class UnsafeMemoryManager {
     }
     if (baseBlock == null) {
       INSTANCE.printCurrentMemoryUsage();
-      throw new MemoryException(
-          "Not enough memory. please increase carbon.unsafe.working.memory.in.mb");
+      throw new MemoryException("Not enough working memory, please increase "
+          + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
     }
     return baseBlock;
   }
@@ -227,7 +233,7 @@ public class UnsafeMemoryManager {
   }
 
   private synchronized void printCurrentMemoryUsage() {
-    LOGGER.error(
-        " Memory Used : " + memoryUsed + " Tasks running : " + taskIdToMemoryBlockMap.keySet());
+    LOGGER.info(String.format("Working memory used %d, running tasks are %s",
+        memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
index 8dcf915..847f6e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -75,17 +76,18 @@ public class UnsafeSortMemoryManager {
   static {
     long size;
     try {
-      size = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-              CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT));
+      size = Long.parseLong(CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB));
     } catch (Exception e) {
-      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
+      size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
+      LOGGER.info("Wrong memory size given, so setting default value to " + size);
     }
-    if (size < 1024) {
-      size = 1024;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
-          + "so setting default value to " + size);
+    if (size < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) {
+      size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
+      LOGGER.warn(String.format(
+          "It is not recommended to set unsafe sort memory size less than %dMB,"
+              + " so setting default value to %d",
+          CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT, size));
     }
 
     long takenSize = size * 1024 * 1024;
@@ -137,9 +139,9 @@ public class UnsafeSortMemoryManager {
   public synchronized void allocateDummyMemory(long size) {
     memoryUsed += size;
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Working Memory block (" + size + ") is created with size " + size
-          + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
-          + "Bytes");
+      LOGGER.debug(String.format(
+          "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes",
+          size, memoryUsed, totalMemory - memoryUsed));
     }
   }
 
@@ -152,9 +154,9 @@ public class UnsafeSortMemoryManager {
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
-                totalMemory - memoryUsed));
+        LOGGER.debug(String.format(
+            "Freeing sort memory block (%s) with size: %d, current available memory is: %d",
+            memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed));
       }
     }
   }
@@ -184,9 +186,12 @@ public class UnsafeSortMemoryManager {
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
-          "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
-              totalMemory - memoryUsed));
+          String.format("Freeing sort memory of size: %d, current available memory is: %d",
+              occuppiedMemory, totalMemory - memoryUsed));
     }
+    LOGGER.info(String.format(
+        "Total sort memory used after task %s is %d. Current running tasks are: %s",
+        taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", ")));
   }
 
   /**
@@ -229,7 +234,8 @@ public class UnsafeSortMemoryManager {
       tries++;
     }
     if (baseBlock == null) {
-      throw new MemoryException("Not enough memory");
+      throw new MemoryException("Not enough sort memory, please increase "
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB);
     }
     return baseBlock;
   }
@@ -239,10 +245,9 @@ public class UnsafeSortMemoryManager {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Working Memory block (" + allocate.size() + ") is created with size " + allocate.size()
-                + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
-                + "Bytes");
+        LOGGER.debug(String.format(
+            "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes",
+            allocate.size(), memoryUsed, totalMemory - memoryUsed));
       }
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c429cee1/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e6440b6..e6d48e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -258,7 +258,6 @@ public final class CarbonProperties {
     validateSortIntermediateFilesLimit();
     validateEnableAutoHandoff();
     validateSchedulerMinRegisteredRatio();
-    validateSortMemorySizeInMB();
     validateWorkingMemory();
     validateSortStorageMemory();
     validateEnableQueryStatistics();
@@ -1282,18 +1281,6 @@ public final class CarbonProperties {
     propertySet.addAll(externalPropertySet);
   }
 
-  private void validateSortMemorySizeInMB() {
-    try {
-      int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
-      carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
-          unsafeSortStorageMemoryString + "");
-    } catch (NumberFormatException ne) {
-      LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid.");
-    }
-  }
-
   private void validateWorkingMemory() {
     try {
       int unsafeWorkingMemory = Integer.parseInt(
@@ -1307,27 +1294,26 @@ public final class CarbonProperties {
   }
 
   private void validateSortStorageMemory() {
-    int unsafeSortStorageMemoryDefault =
-        Integer.parseInt(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
     int unsafeSortStorageMemory = 0;
     try {
       unsafeSortStorageMemory = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
+          .getProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB));
     } catch (NumberFormatException e) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB + "is invalid."
           + " Taking the default value."
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT);
+      unsafeSortStorageMemory = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
     }
-    if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) {
+    if (unsafeSortStorageMemory
+        < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) {
       LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB
           + "is less than the default value." + " Taking the default value."
-          + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
-      unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
+          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT);
+      unsafeSortStorageMemory = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT;
     }
-    carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
+    carbonProperties.setProperty(CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB,
         unsafeSortStorageMemory + "");
   }