You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by dw...@apache.org on 2015/02/05 23:36:50 UTC
incubator-parquet-mr git commit: PARQUET-177: Added lower bound to
memory manager resize
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 668d031d7 -> 05adc21b1
PARQUET-177: Added lower bound to memory manager resize
PARQUET-177
Author: Daniel Weeks <dw...@netflix.com>
Closes #115 from danielcweeks/memory-manager-limit and squashes the following commits:
b2e4708 [Daniel Weeks] Updated to base memory allocation off estimated chunk size
09d7aa3 [Daniel Weeks] Updated property name and default value
8f6cff1 [Daniel Weeks] Added low bound to memory manager resize
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/05adc21b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/05adc21b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/05adc21b
Branch: refs/heads/master
Commit: 05adc21b15dbe30d9bded0cde56f482f1c932d6f
Parents: 668d031
Author: Daniel Weeks <dw...@netflix.com>
Authored: Thu Feb 5 14:36:28 2015 -0800
Committer: Daniel Weeks <dw...@netflix.com>
Committed: Thu Feb 5 14:36:28 2015 -0800
----------------------------------------------------------------------
.../parquet/hadoop/InternalParquetRecordWriter.java | 4 ++++
.../src/main/java/parquet/hadoop/MemoryManager.java | 16 +++++++++++++++-
.../java/parquet/hadoop/ParquetOutputFormat.java | 5 ++++-
3 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 116f0c0..3e7e0e5 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -180,4 +180,8 @@ class InternalParquetRecordWriter<T> {
void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
this.rowGroupSizeThreshold = rowGroupSizeThreshold;
}
+
+ MessageType getSchema() {
+ return this.schema;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
index 7f3803a..fd399e0 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
@@ -19,6 +19,7 @@
package parquet.hadoop;
import parquet.Log;
+import parquet.ParquetRuntimeException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
@@ -39,16 +40,19 @@ import java.util.Map;
public class MemoryManager {
private static final Log LOG = Log.getLog(MemoryManager.class);
static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
+ static final long DEFAULT_MIN_MEMORY_ALLOCATION = ParquetWriter.DEFAULT_PAGE_SIZE;
private final float memoryPoolRatio;
private final long totalMemoryPool;
+ private final long minMemoryAllocation;
private final Map<InternalParquetRecordWriter, Long> writerList = new
HashMap<InternalParquetRecordWriter, Long>();
- public MemoryManager(float ratio) {
+ public MemoryManager(float ratio, long minAllocation) {
checkRatio(ratio);
memoryPoolRatio = ratio;
+ minMemoryAllocation = minAllocation;
totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
() * ratio);
LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
@@ -106,8 +110,18 @@ public class MemoryManager {
scale = (double) totalMemoryPool / totalAllocations;
}
+ int maxColCount = 0;
+ for (InternalParquetRecordWriter w : writerList.keySet()) {
+ maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
+ }
+
for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
long newSize = (long) Math.floor(entry.getValue() * scale);
+ if(minMemoryAllocation > 0 && newSize/maxColCount < minMemoryAllocation) {
+ throw new ParquetRuntimeException(String.format("New Memory allocation %d"+
+ " exceeds minimum allocation size %d with largest schema having %d columns",
+ newSize, minMemoryAllocation, maxColCount)){};
+ }
entry.getKey().setRowGroupSizeThreshold(newSize);
LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
entry.getValue(), newSize, entry.getKey()));
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/05adc21b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 9788a0e..43648f4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -108,6 +108,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static final String WRITER_VERSION = "parquet.writer.version";
public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata";
public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio";
+ public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
public static void setWriteSupportClass(Job job, Class<?> writeSupportClass) {
getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -290,8 +291,10 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+ long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
+ MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
if (memoryManager == null) {
- memoryManager = new MemoryManager(maxLoad);
+ memoryManager = new MemoryManager(maxLoad, minAllocation);
} else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
"be reset by the new value: " + maxLoad);