You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:34 UTC
[hudi] 24/45: [HUDI-4526] Improve spillableMapBasePath when disk directory is full (#6284)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e95a9f56ae2703b566e9c21e58bacbd6379fa35e
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Wed Nov 9 13:07:55 2022 +0800
[HUDI-4526] Improve spillableMapBasePath when disk directory is full (#6284)
(cherry picked from commit 371296173a7c51c325e6f9c3a3ef2ba5f6a89f6e)
---
.../org/apache/hudi/config/HoodieMemoryConfig.java | 9 ++++--
.../table/log/HoodieMergedLogRecordScanner.java | 9 +++---
.../org/apache/hudi/common/util/FileIOUtils.java | 36 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
index 4e37796393..960ec61dc0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
@@ -22,9 +22,10 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
import javax.annotation.concurrent.Immutable;
-
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.defaultValue("/tmp/")
- .withDocumentation("Default file path prefix for spillable map");
+ .withInferFunction(cfg -> {
+ String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
+ return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty();
+ })
+ .withDocumentation("Default file path for spillable map");
public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
.key("hoodie.memory.writestatus.failure.fraction")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 5ef0a6821f..45975fbfde 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -18,6 +18,9 @@
package org.apache.hudi.common.table.log;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -34,12 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -95,6 +93,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
+
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
index 6a9e2e1b35..426a703503 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
@@ -204,4 +204,40 @@ public class FileIOUtils {
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
return readDataFromPath(fileSystem, detailPath, false);
}
+
+ /**
+ * Return the configured local directories where hudi can write files. This
+ * method does not create any directories on its own, it only encapsulates the
+ * logic of locating the local directories according to deployment mode.
+ */
+ public static String[] getConfiguredLocalDirs() {
+ if (isRunningInYarnContainer()) {
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available. Note this assumes that Yarn has
+ // created the directories already, and that they are secured so that only the
+ // user has access to them.
+ return getYarnLocalDirs().split(",");
+ } else if (System.getProperty("java.io.tmpdir") != null) {
+ return System.getProperty("java.io.tmpdir").split(",");
+ } else {
+ return null;
+ }
+ }
+
+ private static boolean isRunningInYarnContainer() {
+ // These environment variables are set by YARN.
+ return System.getenv("CONTAINER_ID") != null;
+ }
+
+ /**
+ * Get the Yarn approved local directories.
+ */
+ private static String getYarnLocalDirs() {
+ String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse("");
+
+ if (localDirs.isEmpty()) {
+ throw new HoodieIOException("Yarn Local dirs can't be empty");
+ }
+ return localDirs;
+ }
}