You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/21 04:00:11 UTC
[incubator-hudi] branch master updated: [HUDI-283] : Ensure a sane
minimum for merge buffer memory (#964)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dfdc0e4 [HUDI-283] : Ensure a sane minimum for merge buffer memory (#964)
dfdc0e4 is described below
commit dfdc0e40e1f85c49e580b31204621758e3d76ac5
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Sun Oct 20 21:00:04 2019 -0700
[HUDI-283] : Ensure a sane minimum for merge buffer memory (#964)
- Some environments e.g spark-shell provide 0 for memory size
- This causes unnecessary performance degradation
---
.../java/org/apache/hudi/config/HoodieMemoryConfig.java | 14 +++++++++++---
.../main/java/org/apache/hudi/io/HoodieMergeHandle.java | 8 +++++---
2 files changed, 16 insertions(+), 6 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
index 88f07b9..be6b96e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
@@ -40,8 +40,10 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction";
// Default max memory fraction during compaction, excess spills to disk
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6);
- // Default memory size per compaction (used if SparkEnv is absent), excess spills to disk
- public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L; // 1GB
+ // Default memory size (1GB) per compaction (used if SparkEnv is absent), excess spills to disk
+ public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L;
+ // Minimum memory size (100MB) for the spillable map.
+ public static final long DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 100 * 1024 * 1024L;
// Property to set the max memory for merge
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
// Property to set the max memory for compaction
@@ -91,6 +93,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withMaxMemoryMaxSize(long mergeMaxSize, long compactionMaxSize) {
+ props.setProperty(MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(mergeMaxSize));
+ props.setProperty(MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(compactionMaxSize));
+ return this;
+ }
+
public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) {
props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction));
return this;
@@ -136,7 +144,7 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
- return maxMemoryForMerge;
+ return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
} else {
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4826c89..095dbd5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -193,9 +193,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
- logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
- this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
- config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
+ long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
+ logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+ this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge,
+ config.getSpillableMapBasePath(), new DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(originalSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}