You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/08/04 00:51:33 UTC

[hudi] branch master updated: [HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close (#3329)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b4c14ea  [HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close  (#3329)
b4c14ea is described below

commit b4c14eaa29ed4f3b0f8f44dbfdd75ee68702645b
Author: rmahindra123 <76...@users.noreply.github.com>
AuthorDate: Tue Aug 3 17:51:25 2021 -0700

    [HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close  (#3329)
    
    * Add UUID to the folder name for External Spillable File System
    
    * Fix to ensure that Disk maps folders do not interefere across users
    
    * Fix test
    
    * Fix test
    
    * Rebase with latest mater and address comments
    
    * Add Shutdown Hooks for the Disk Map
    
    Co-authored-by: Rajesh Mahindra <rm...@Rajeshs-MacBook-Pro.local>
---
 .../table/view/FileSystemViewStorageConfig.java    |  2 +-
 .../common/util/collection/BitCaskDiskMap.java     | 28 +++------
 .../hudi/common/util/collection/DiskMap.java       | 69 ++++++++++++++++++++--
 .../common/util/collection/RocksDbDiskMap.java     | 22 +++----
 .../common/util/collection/TestBitCaskDiskMap.java | 14 +++++
 .../util/collection/TestExternalSpillableMap.java  |  1 +
 6 files changed, 98 insertions(+), 38 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 80d0d57..603f6bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -72,7 +72,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty
       .key("hoodie.filesystem.view.spillable.dir")
-      .defaultValue("/tmp/view_map/")
+      .defaultValue("/tmp/")
       .withDocumentation("Path on local storage to use, when file system view is held in a spillable map.");
 
   public static final ConfigProperty<Long> FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
index d9e9701..7590e9a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
@@ -61,13 +61,14 @@ import java.util.zip.InflaterInputStream;
  *
  * Inspired by https://github.com/basho/bitcask
  */
-public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
+public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
 
   public static final int BUFFER_SIZE = 128 * 1024;  // 128 KB
   private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
   // Caching byte compression/decompression to avoid creating instances for every operation
   private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF =
       ThreadLocal.withInitial(CompressionHandler::new);
+
   // Stores the key and corresponding value's latest metadata spilled to disk
   private final Map<T, ValueMetadata> valueMetadataMap;
   // Enables compression for all values stored in the disk map
@@ -87,12 +88,11 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
   private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
   private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
 
-  private transient Thread shutdownThread = null;
-
   public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
+    super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
     this.valueMetadataMap = new ConcurrentHashMap<>();
     this.isCompressionEnabled = isCompressionEnabled;
-    this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
+    this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString());
     this.filePath = writeOnlyFile.getPath();
     initFile(writeOnlyFile);
     this.fileOutputStream = new FileOutputStream(writeOnlyFile, true);
@@ -138,16 +138,6 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
         + ")");
     // Make sure file is deleted when JVM exits
     writeOnlyFile.deleteOnExit();
-    addShutDownHook();
-  }
-
-  /**
-   * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
-   * (typically 4 KB) to disk.
-   */
-  private void addShutDownHook() {
-    shutdownThread = new Thread(this::cleanup);
-    Runtime.getRuntime().addShutdownHook(shutdownThread);
   }
 
   private void flushToDisk() {
@@ -267,14 +257,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
     // reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit.
   }
 
+  @Override
   public void close() {
-    cleanup();
-    if (shutdownThread != null) {
-      Runtime.getRuntime().removeShutdownHook(shutdownThread);
-    }
-  }
-
-  private void cleanup() {
     valueMetadataMap.clear();
     try {
       if (writeOnlyFileHandle != null) {
@@ -297,6 +281,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
     } catch (Exception e) {
       // delete the file for any sort of exception
       writeOnlyFile.delete();
+    } finally {
+      super.close();
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
index 5f3b441..c609212 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
@@ -18,8 +18,16 @@
 
 package org.apache.hudi.common.util.collection;
 
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Stream;
 
 /**
@@ -29,21 +37,72 @@ import java.util.stream.Stream;
  * @param <T> The generic type of the keys
  * @param <R> The generic type of the values
  */
-public interface DiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
+public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
+
+  private static final Logger LOG = LogManager.getLogger(DiskMap.class);
+  private static final String SUBFOLDER_PREFIX = "hudi";
+  private final File diskMapPathFile;
+  private transient Thread shutdownThread = null;
+
+  // Base path for the write file
+  protected final String diskMapPath;
+
+  public DiskMap(String basePath, String prefix) throws IOException {
+    this.diskMapPath =
+        String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
+    diskMapPathFile = new File(diskMapPath);
+    FileIOUtils.deleteDirectory(diskMapPathFile);
+    FileIOUtils.mkdir(diskMapPathFile);
+    // Make sure the folder is deleted when JVM exits
+    diskMapPathFile.deleteOnExit();
+    addShutDownHook();
+  }
+
+  /**
+   * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
+   * (typically 4 KB) to disk.
+   */
+  private void addShutDownHook() {
+    shutdownThread = new Thread(this::cleanup);
+    Runtime.getRuntime().addShutdownHook(shutdownThread);
+  }
 
   /**
    * @returns a stream of the values stored in the disk.
    */
-  Stream<R> valueStream();
+  abstract Stream<R> valueStream();
 
   /**
    * Number of bytes spilled to disk.
    */
-  long sizeOfFileOnDiskInBytes();
+  abstract long sizeOfFileOnDiskInBytes();
+
+  /**
+   * Close and cleanup the Map.
+   */
+  public void close() {
+    cleanup(false);
+  }
 
   /**
-   * Cleanup.
+   * Cleanup all resources, files and folders
+   * triggered by shutdownhook.
    */
-  void close();
+  private void cleanup() {
+    cleanup(true);
+  }
 
+  /**
+   * Cleanup all resources, files and folders.
+   */
+  private void cleanup(boolean isTriggeredFromShutdownHook) {
+    try {
+      FileIOUtils.deleteDirectory(diskMapPathFile);
+    } catch (IOException exception) {
+      LOG.warn("Error while deleting the disk map directory=" + diskMapPath, exception);
+    }
+    if (!isTriggeredFromShutdownHook && shutdownThread != null) {
+      Runtime.getRuntime().removeShutdownHook(shutdownThread);
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
index c500876..21211a5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
@@ -40,23 +40,22 @@ import java.util.stream.StreamSupport;
  * This class provides a disk spillable only map implementation.
  * All of the data is stored using the RocksDB implementation.
  */
-public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
+public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
   // ColumnFamily allows partitioning data within RockDB, which allows
   // independent configuration and faster deletes across partitions
   // https://github.com/facebook/rocksdb/wiki/Column-Families
   // For this use case, we use a single static column family/ partition
   //
-  private static final String COLUMN_FAMILY_NAME = "spill_map";
+  private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap";
 
   private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
   // Stores the key and corresponding value's latest metadata spilled to disk
   private final Set<T> keySet;
-  private final String rocksDbStoragePath;
   private RocksDBDAO rocksDb;
 
   public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
+    super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
     this.keySet = new HashSet<>();
-    this.rocksDbStoragePath = rocksDbStoragePath;
   }
 
   @Override
@@ -84,12 +83,12 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
     if (!containsKey(key)) {
       return null;
     }
-    return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+    return getRocksDb().get(ROCKSDB_COL_FAMILY, (T) key);
   }
 
   @Override
   public R put(T key, R value) {
-    getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+    getRocksDb().put(ROCKSDB_COL_FAMILY, key, value);
     keySet.add(key);
     return value;
   }
@@ -99,14 +98,14 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
     R value = get(key);
     if (value != null) {
       keySet.remove((T) key);
-      getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+      getRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key);
     }
     return value;
   }
 
   @Override
   public void putAll(Map<? extends T, ? extends R> keyValues) {
-    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value)));
     keySet.addAll(keyValues.keySet());
   }
 
@@ -139,7 +138,7 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
    */
   @Override
   public Iterator<R> iterator() {
-    return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+    return getRocksDb().iterator(ROCKSDB_COL_FAMILY);
   }
 
   @Override
@@ -159,14 +158,15 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
       rocksDb.close();
     }
     rocksDb = null;
+    super.close();
   }
 
   private RocksDBDAO getRocksDb() {
     if (null == rocksDb) {
       synchronized (this) {
         if (null == rocksDb) {
-          rocksDb = new RocksDBDAO(COLUMN_FAMILY_NAME, rocksDbStoragePath);
-          rocksDb.addColumnFamily(COLUMN_FAMILY_NAME);
+          rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath);
+          rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY);
         }
       }
     }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
index dd3d3fc..208c6d9 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.URISyntaxException;
@@ -50,6 +51,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -89,6 +91,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
       Option<IndexedRecord> value = payload.getInsertValue(HoodieAvroUtils.addMetadataFields(getSimpleSchema()));
       assertEquals(originalRecord, value.get());
     }
+
+    verifyCleanup(records);
   }
 
   @ParameterizedTest
@@ -111,6 +115,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
       oRecords.add(rec);
       assert recordKeys.contains(rec.getRecordKey());
     }
+
+    verifyCleanup(records);
   }
 
   @ParameterizedTest
@@ -154,6 +160,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
         throw new UncheckedIOException(io);
       }
     }
+    verifyCleanup(records);
   }
 
   @Test
@@ -236,4 +243,11 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
     System.out.println("Time taken :" + timeTaken);
     assertTrue(timeTaken < 100);
   }
+
+  private void verifyCleanup(BitCaskDiskMap<String, HoodieRecord> records) {
+    File basePathDir = new File(basePath);
+    assert Objects.requireNonNull(basePathDir.list()).length > 0;
+    records.close();
+    assertEquals(Objects.requireNonNull(basePathDir.list()).length, 0);
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 0513151..4fed5a8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -342,6 +342,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
   public void testLargeInsertUpsert() {}
 
   private static Stream<Arguments> testArguments() {
+    // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
     return Stream.of(
         arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),