You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by dw...@apache.org on 2015/02/05 23:36:50 UTC

incubator-parquet-mr git commit: PARQUET-177: Added lower bound to memory manager resize

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 668d031d7 -> 05adc21b1


PARQUET-177: Added lower bound to memory manager resize

PARQUET-177

Author: Daniel Weeks <dw...@netflix.com>

Closes #115 from danielcweeks/memory-manager-limit and squashes the following commits:

b2e4708 [Daniel Weeks] Updated to base memory allocation off estimated chunk size
09d7aa3 [Daniel Weeks] Updated property name and default value
8f6cff1 [Daniel Weeks] Added low bound to memory manager resize


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/05adc21b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/05adc21b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/05adc21b

Branch: refs/heads/master
Commit: 05adc21b15dbe30d9bded0cde56f482f1c932d6f
Parents: 668d031
Author: Daniel Weeks <dw...@netflix.com>
Authored: Thu Feb 5 14:36:28 2015 -0800
Committer: Daniel Weeks <dw...@netflix.com>
Committed: Thu Feb 5 14:36:28 2015 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/InternalParquetRecordWriter.java |  4 ++++
 .../src/main/java/parquet/hadoop/MemoryManager.java | 16 +++++++++++++++-
 .../java/parquet/hadoop/ParquetOutputFormat.java    |  5 ++++-
 3 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 116f0c0..3e7e0e5 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -180,4 +180,8 @@ class InternalParquetRecordWriter<T> {
   void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
     this.rowGroupSizeThreshold = rowGroupSizeThreshold;
   }
+
+  MessageType getSchema() {
+    return this.schema;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
index 7f3803a..fd399e0 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
@@ -19,6 +19,7 @@
 package parquet.hadoop;
 
 import parquet.Log;
+import parquet.ParquetRuntimeException;
 
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
@@ -39,16 +40,19 @@ import java.util.Map;
 public class MemoryManager {
   private static final Log LOG = Log.getLog(MemoryManager.class);
   static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
+  static final long DEFAULT_MIN_MEMORY_ALLOCATION = ParquetWriter.DEFAULT_PAGE_SIZE;
   private final float memoryPoolRatio;
 
   private final long totalMemoryPool;
+  private final long minMemoryAllocation;
   private final Map<InternalParquetRecordWriter, Long> writerList = new
       HashMap<InternalParquetRecordWriter, Long>();
 
-  public MemoryManager(float ratio) {
+  public MemoryManager(float ratio, long minAllocation) {
     checkRatio(ratio);
 
     memoryPoolRatio = ratio;
+    minMemoryAllocation = minAllocation;
     totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
         () * ratio);
     LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
@@ -106,8 +110,18 @@ public class MemoryManager {
       scale = (double) totalMemoryPool / totalAllocations;
     }
 
+    int maxColCount = 0;
+    for (InternalParquetRecordWriter w : writerList.keySet()) {
+      maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
+    }
+
     for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
       long newSize = (long) Math.floor(entry.getValue() * scale);
+      if(minMemoryAllocation > 0 && newSize/maxColCount < minMemoryAllocation) {
+          throw new ParquetRuntimeException(String.format("New Memory allocation %d"+
+          " exceeds minimum allocation size %d with largest schema having %d columns",
+              newSize, minMemoryAllocation, maxColCount)){};
+      }
       entry.getKey().setRowGroupSizeThreshold(newSize);
       LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
             entry.getValue(), newSize, entry.getKey()));

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 9788a0e..43648f4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -108,6 +108,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String WRITER_VERSION       = "parquet.writer.version";
   public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
   public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
+  public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
 
   public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
     getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -290,8 +291,10 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
         MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
+        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
     if (memoryManager == null) {
-      memoryManager = new MemoryManager(maxLoad);
+      memoryManager = new MemoryManager(maxLoad, minAllocation);
     } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
       LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
           "be reset by the new value: " + maxLoad);