You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:34 UTC

[hudi] 24/45: [HUDI-4526] Improve spillableMapBasePath when disk directory is full (#6284)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e95a9f56ae2703b566e9c21e58bacbd6379fa35e
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Wed Nov 9 13:07:55 2022 +0800

    [HUDI-4526] Improve spillableMapBasePath when disk directory is full (#6284)
    
    (cherry picked from commit 371296173a7c51c325e6f9c3a3ef2ba5f6a89f6e)
---
 .../org/apache/hudi/config/HoodieMemoryConfig.java |  9 ++++--
 .../table/log/HoodieMergedLogRecordScanner.java    |  9 +++---
 .../org/apache/hudi/common/util/FileIOUtils.java   | 36 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
index 4e37796393..960ec61dc0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
@@ -22,9 +22,10 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
 
 import javax.annotation.concurrent.Immutable;
-
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
   public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
       .key("hoodie.memory.spillable.map.path")
       .defaultValue("/tmp/")
-      .withDocumentation("Default file path prefix for spillable map");
+      .withInferFunction(cfg -> {
+        String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
+        return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty();
+      })
+      .withDocumentation("Default file path for spillable map");
 
   public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
       .key("hoodie.memory.writestatus.failure.fraction")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 5ef0a6821f..45975fbfde 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -34,12 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -95,6 +93,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
+
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
index 6a9e2e1b35..426a703503 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
@@ -204,4 +204,40 @@ public class FileIOUtils {
   public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
     return readDataFromPath(fileSystem, detailPath, false);
   }
+
+  /**
+   * Return the configured local directories where hudi can write files. This
+   * method does not create any directories on its own, it only encapsulates the
+   * logic of locating the local directories according to deployment mode.
+   */
+  public static String[] getConfiguredLocalDirs() {
+    if (isRunningInYarnContainer()) {
+      // If we are in yarn mode, systems can have different disk layouts so we must set it
+      // to what Yarn on this system said was available. Note this assumes that Yarn has
+      // created the directories already, and that they are secured so that only the
+      // user has access to them.
+      return getYarnLocalDirs().split(",");
+    } else if (System.getProperty("java.io.tmpdir") != null) {
+      return System.getProperty("java.io.tmpdir").split(",");
+    } else {
+      return null;
+    }
+  }
+
+  private static boolean isRunningInYarnContainer() {
+    // These environment variables are set by YARN.
+    return System.getenv("CONTAINER_ID") != null;
+  }
+
+  /**
+   * Get the Yarn approved local directories.
+   */
+  private static String getYarnLocalDirs() {
+    String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse("");
+
+    if (localDirs.isEmpty()) {
+      throw new HoodieIOException("Yarn Local dirs can't be empty");
+    }
+    return localDirs;
+  }
 }