You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/08/04 00:51:33 UTC
[hudi] branch master updated: [HUDI-2090] Ensure Disk Maps create a
subfolder with appropriate prefixes and cleans them up on close (#3329)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b4c14ea [HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close (#3329)
b4c14ea is described below
commit b4c14eaa29ed4f3b0f8f44dbfdd75ee68702645b
Author: rmahindra123 <76...@users.noreply.github.com>
AuthorDate: Tue Aug 3 17:51:25 2021 -0700
[HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close (#3329)
* Add UUID to the folder name for External Spillable File System
* Fix to ensure that Disk maps folders do not interefere across users
* Fix test
* Fix test
* Rebase with latest mater and address comments
* Add Shutdown Hooks for the Disk Map
Co-authored-by: Rajesh Mahindra <rm...@Rajeshs-MacBook-Pro.local>
---
.../table/view/FileSystemViewStorageConfig.java | 2 +-
.../common/util/collection/BitCaskDiskMap.java | 28 +++------
.../hudi/common/util/collection/DiskMap.java | 69 ++++++++++++++++++++--
.../common/util/collection/RocksDbDiskMap.java | 22 +++----
.../common/util/collection/TestBitCaskDiskMap.java | 14 +++++
.../util/collection/TestExternalSpillableMap.java | 1 +
6 files changed, 98 insertions(+), 38 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 80d0d57..603f6bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -72,7 +72,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
public static final ConfigProperty<String> FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty
.key("hoodie.filesystem.view.spillable.dir")
- .defaultValue("/tmp/view_map/")
+ .defaultValue("/tmp/")
.withDocumentation("Path on local storage to use, when file system view is held in a spillable map.");
public static final ConfigProperty<Long> FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
index d9e9701..7590e9a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
@@ -61,13 +61,14 @@ import java.util.zip.InflaterInputStream;
*
* Inspired by https://github.com/basho/bitcask
*/
-public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
+public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
// Caching byte compression/decompression to avoid creating instances for every operation
private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF =
ThreadLocal.withInitial(CompressionHandler::new);
+
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Enables compression for all values stored in the disk map
@@ -87,12 +88,11 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
- private transient Thread shutdownThread = null;
-
public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
+ super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
this.valueMetadataMap = new ConcurrentHashMap<>();
this.isCompressionEnabled = isCompressionEnabled;
- this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
+ this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString());
this.filePath = writeOnlyFile.getPath();
initFile(writeOnlyFile);
this.fileOutputStream = new FileOutputStream(writeOnlyFile, true);
@@ -138,16 +138,6 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
+ ")");
// Make sure file is deleted when JVM exits
writeOnlyFile.deleteOnExit();
- addShutDownHook();
- }
-
- /**
- * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
- * (typically 4 KB) to disk.
- */
- private void addShutDownHook() {
- shutdownThread = new Thread(this::cleanup);
- Runtime.getRuntime().addShutdownHook(shutdownThread);
}
private void flushToDisk() {
@@ -267,14 +257,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
// reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit.
}
+ @Override
public void close() {
- cleanup();
- if (shutdownThread != null) {
- Runtime.getRuntime().removeShutdownHook(shutdownThread);
- }
- }
-
- private void cleanup() {
valueMetadataMap.clear();
try {
if (writeOnlyFileHandle != null) {
@@ -297,6 +281,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
} catch (Exception e) {
// delete the file for any sort of exception
writeOnlyFile.delete();
+ } finally {
+ super.close();
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
index 5f3b441..c609212 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
@@ -18,8 +18,16 @@
package org.apache.hudi.common.util.collection;
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Stream;
/**
@@ -29,21 +37,72 @@ import java.util.stream.Stream;
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
-public interface DiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
+public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
+
+ private static final Logger LOG = LogManager.getLogger(DiskMap.class);
+ private static final String SUBFOLDER_PREFIX = "hudi";
+ private final File diskMapPathFile;
+ private transient Thread shutdownThread = null;
+
+ // Base path for the write file
+ protected final String diskMapPath;
+
+ public DiskMap(String basePath, String prefix) throws IOException {
+ this.diskMapPath =
+ String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
+ diskMapPathFile = new File(diskMapPath);
+ FileIOUtils.deleteDirectory(diskMapPathFile);
+ FileIOUtils.mkdir(diskMapPathFile);
+ // Make sure the folder is deleted when JVM exits
+ diskMapPathFile.deleteOnExit();
+ addShutDownHook();
+ }
+
+ /**
+ * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
+ * (typically 4 KB) to disk.
+ */
+ private void addShutDownHook() {
+ shutdownThread = new Thread(this::cleanup);
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ }
/**
* @returns a stream of the values stored in the disk.
*/
- Stream<R> valueStream();
+ abstract Stream<R> valueStream();
/**
* Number of bytes spilled to disk.
*/
- long sizeOfFileOnDiskInBytes();
+ abstract long sizeOfFileOnDiskInBytes();
+
+ /**
+ * Close and cleanup the Map.
+ */
+ public void close() {
+ cleanup(false);
+ }
/**
- * Cleanup.
+ * Cleanup all resources, files and folders
+ * triggered by shutdownhook.
*/
- void close();
+ private void cleanup() {
+ cleanup(true);
+ }
+ /**
+ * Cleanup all resources, files and folders.
+ */
+ private void cleanup(boolean isTriggeredFromShutdownHook) {
+ try {
+ FileIOUtils.deleteDirectory(diskMapPathFile);
+ } catch (IOException exception) {
+ LOG.warn("Error while deleting the disk map directory=" + diskMapPath, exception);
+ }
+ if (!isTriggeredFromShutdownHook && shutdownThread != null) {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
index c500876..21211a5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
@@ -40,23 +40,22 @@ import java.util.stream.StreamSupport;
* This class provides a disk spillable only map implementation.
* All of the data is stored using the RocksDB implementation.
*/
-public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
+public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
// ColumnFamily allows partitioning data within RockDB, which allows
// independent configuration and faster deletes across partitions
// https://github.com/facebook/rocksdb/wiki/Column-Families
// For this use case, we use a single static column family/ partition
//
- private static final String COLUMN_FAMILY_NAME = "spill_map";
+ private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap";
private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Set<T> keySet;
- private final String rocksDbStoragePath;
private RocksDBDAO rocksDb;
public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
+ super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
this.keySet = new HashSet<>();
- this.rocksDbStoragePath = rocksDbStoragePath;
}
@Override
@@ -84,12 +83,12 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
if (!containsKey(key)) {
return null;
}
- return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+ return getRocksDb().get(ROCKSDB_COL_FAMILY, (T) key);
}
@Override
public R put(T key, R value) {
- getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+ getRocksDb().put(ROCKSDB_COL_FAMILY, key, value);
keySet.add(key);
return value;
}
@@ -99,14 +98,14 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
R value = get(key);
if (value != null) {
keySet.remove((T) key);
- getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+ getRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key);
}
return value;
}
@Override
public void putAll(Map<? extends T, ? extends R> keyValues) {
- getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+ getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value)));
keySet.addAll(keyValues.keySet());
}
@@ -139,7 +138,7 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
*/
@Override
public Iterator<R> iterator() {
- return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+ return getRocksDb().iterator(ROCKSDB_COL_FAMILY);
}
@Override
@@ -159,14 +158,15 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
rocksDb.close();
}
rocksDb = null;
+ super.close();
}
private RocksDBDAO getRocksDb() {
if (null == rocksDb) {
synchronized (this) {
if (null == rocksDb) {
- rocksDb = new RocksDBDAO(COLUMN_FAMILY_NAME, rocksDbStoragePath);
- rocksDb.addColumnFamily(COLUMN_FAMILY_NAME);
+ rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath);
+ rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY);
}
}
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
index dd3d3fc..208c6d9 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
@@ -50,6 +51,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -89,6 +91,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
Option<IndexedRecord> value = payload.getInsertValue(HoodieAvroUtils.addMetadataFields(getSimpleSchema()));
assertEquals(originalRecord, value.get());
}
+
+ verifyCleanup(records);
}
@ParameterizedTest
@@ -111,6 +115,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
}
+
+ verifyCleanup(records);
}
@ParameterizedTest
@@ -154,6 +160,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
throw new UncheckedIOException(io);
}
}
+ verifyCleanup(records);
}
@Test
@@ -236,4 +243,11 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
System.out.println("Time taken :" + timeTaken);
assertTrue(timeTaken < 100);
}
+
+ private void verifyCleanup(BitCaskDiskMap<String, HoodieRecord> records) {
+ File basePathDir = new File(basePath);
+ assert Objects.requireNonNull(basePathDir.list()).length > 0;
+ records.close();
+ assertEquals(Objects.requireNonNull(basePathDir.list()).length, 0);
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 0513151..4fed5a8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -342,6 +342,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
public void testLargeInsertUpsert() {}
private static Stream<Arguments> testArguments() {
+ // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),