You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2019/03/31 06:55:00 UTC
[hadoop] branch trunk updated: HDFS-14355 : Implement HDFS cache on
SCM by using pure java mapped byte buffer. Contributed by Feilong He.
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35ff31d HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
35ff31d is described below
commit 35ff31dd9462cf4fb4ebf5556ee8ae6bcd7c5c3a
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Sat Mar 30 23:33:25 2019 -0700
HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 19 ++
.../apache/hadoop/hdfs/server/datanode/DNConf.java | 31 ++
.../datanode/fsdataset/impl/FsDatasetCache.java | 90 +++++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 20 +-
.../datanode/fsdataset/impl/FsDatasetUtil.java | 27 ++
.../fsdataset/impl/MappableBlockLoader.java | 41 ++-
.../fsdataset/impl/MemoryMappableBlockLoader.java | 45 ++-
...ockLoader.java => PmemMappableBlockLoader.java} | 132 ++++++---
.../datanode/fsdataset/impl/PmemMappedBlock.java | 70 +++++
.../datanode/fsdataset/impl/PmemVolumeManager.java | 306 +++++++++++++++++++
.../src/main/resources/hdfs-default.xml | 34 +++
.../impl/TestCacheByPmemMappableBlockLoader.java | 329 +++++++++++++++++++++
12 files changed, 1077 insertions(+), 67 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b3cd0a4..5139f1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -390,6 +392,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
+ // Currently, the available cache loaders are MemoryMappableBlockLoader,
+ // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
+ // loader to cache block replica to memory.
+ public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
+ "dfs.datanode.cache.loader.class";
+ public static final Class<? extends MappableBlockLoader>
+ DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
+ MemoryMappableBlockLoader.class;
+ // Multiple dirs separated by "," are acceptable.
+ public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
+ "dfs.datanode.cache.pmem.dirs";
+ public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
+ // The cache capacity of persistent memory
+ public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
+ "dfs.datanode.cache.pmem.capacity";
+ public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
+
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index de26c90..69375b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,6 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
@@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
@@ -112,7 +118,10 @@ public class DNConf {
final long xceiverStopTimeout;
final long restartReplicaExpiry;
+ private final Class<? extends MappableBlockLoader> cacheLoaderClass;
final long maxLockedMemory;
+ private final long maxLockedPmem;
+ private final String[] pmemDirs;
private final long bpReadyTimeout;
@@ -251,10 +260,20 @@ public class DNConf {
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
+ this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
+ DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
+
this.maxLockedMemory = getConf().getLongBytes(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+ this.maxLockedPmem = getConf().getLongBytes(
+ DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
+ DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
+
+ this.pmemDirs = getConf().getTrimmedStrings(
+ DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
+
this.restartReplicaExpiry = getConf().getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
@@ -317,6 +336,10 @@ public class DNConf {
return maxLockedMemory;
}
+ public long getMaxLockedPmem() {
+ return maxLockedPmem;
+ }
+
/**
* Returns true if connect to datanode via hostname
*
@@ -419,4 +442,12 @@ public class DNConf {
int getMaxDataLength() {
return maxDataLength;
}
+
+ public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
+ return cacheLoaderClass;
+ }
+
+ public String[] getPmemVolumes() {
+ return pmemDirs;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index f90a4b1..dce84c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -48,10 +49,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +132,11 @@ public class FsDatasetCache {
private final long revocationPollingMs;
- private final MappableBlockLoader mappableBlockLoader;
+ /**
+ * A specific cacheLoader could cache block either to DRAM or
+ * to persistent memory.
+ */
+ private final MappableBlockLoader cacheLoader;
private final MemoryCacheStats memCacheStats;
@@ -173,11 +179,41 @@ public class FsDatasetCache {
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
-
- this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
// Both lazy writer and read cache are sharing this statistics.
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
+
+ Class<? extends MappableBlockLoader> cacheLoaderClass =
+ dataset.datanode.getDnConf().getCacheLoaderClass();
+ this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
+ cacheLoader.initialize(this);
+ }
+
+ /**
+ * Check if pmem cache is enabled.
+ */
+ private boolean isPmemCacheEnabled() {
+ return !cacheLoader.isTransientCache();
+ }
+
+ DNConf getDnConf() {
+ return this.dataset.datanode.getDnConf();
+ }
+
+ MemoryCacheStats getMemCacheStats() {
+ return memCacheStats;
+ }
+
+ /**
+ * Get the cache path if the replica is cached into persistent memory.
+ */
+ String getReplicaCachePath(String bpid, long blockId) {
+ if (cacheLoader.isTransientCache() ||
+ !isCached(bpid, blockId)) {
+ return null;
+ }
+ ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+ return cacheLoader.getCachedPath(key);
}
/**
@@ -344,14 +380,14 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
- long newUsedBytes = mappableBlockLoader.reserve(length);
+ long newUsedBytes = cacheLoader.reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
- DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
- " of " + memCacheStats.getCacheCapacity() + " exceeded.");
+ cacheLoader.getCacheCapacityConfigKey() +
+ " of " + cacheLoader.getCacheCapacity() + " exceeded.");
return;
}
reservedBytes = true;
@@ -370,8 +406,9 @@ public class FsDatasetCache {
LOG.warn("Failed to cache " + key + ": failed to open file", e);
return;
}
+
try {
- mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
+ mappableBlock = cacheLoader.load(length, blockIn, metaIn,
blockFileName, key);
} catch (ChecksumException e) {
// Exception message is bogus since this wasn't caused by a file read
@@ -381,6 +418,7 @@ public class FsDatasetCache {
LOG.warn("Failed to cache the block [key=" + key + "]!", e);
return;
}
+
synchronized (FsDatasetCache.this) {
Value value = mappableBlockMap.get(key);
Preconditions.checkNotNull(value);
@@ -404,7 +442,7 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
- mappableBlockLoader.release(length);
+ cacheLoader.release(length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, memCacheStats.getCacheUsed());
@@ -481,8 +519,7 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
- long newUsedBytes = mappableBlockLoader
- .release(value.mappableBlock.getLength());
+ long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
@@ -498,19 +535,41 @@ public class FsDatasetCache {
// Stats related methods for FSDatasetMBean
/**
- * Get the approximate amount of cache space used.
+ * Get the approximate amount of DRAM cache space used.
*/
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
}
/**
- * Get the maximum amount of bytes we can cache. This is a constant.
+ * Get the approximate amount of persistent memory cache space used.
+ * TODO: advertise this metric to NameNode by FSDatasetMBean
+ */
+ public long getPmemCacheUsed() {
+ if (isPmemCacheEnabled()) {
+ return cacheLoader.getCacheUsed();
+ }
+ return 0;
+ }
+
+ /**
+ * Get the maximum amount of bytes we can cache on DRAM. This is a constant.
*/
public long getCacheCapacity() {
return memCacheStats.getCacheCapacity();
}
+ /**
+ * Get cache capacity of persistent memory.
+ * TODO: advertise this metric to NameNode by FSDatasetMBean
+ */
+ public long getPmemCacheCapacity() {
+ if (isPmemCacheEnabled()) {
+ return cacheLoader.getCacheCapacity();
+ }
+ return 0;
+ }
+
public long getNumBlocksFailedToCache() {
return numBlocksFailedToCache.get();
}
@@ -528,4 +587,9 @@ public class FsDatasetCache {
Value val = mappableBlockMap.get(block);
return (val != null) && val.state.shouldAdvertise();
}
+
+ @VisibleForTesting
+ MappableBlockLoader getCacheLoader() {
+ return cacheLoader;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ad43b45..6fe0d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -787,11 +787,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getMetrics().incrRamDiskBlocksReadHits();
}
- if (info != null) {
- return info.getDataInputStream(seekOffset);
- } else {
+ if (info == null) {
throw new IOException("No data exists for block " + b);
}
+ return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset);
+ }
+
+ /**
+ * Check whether the replica is cached to persistent memory.
+ * If so, get DataInputStream of the corresponding cache file on pmem.
+ */
+ private InputStream getBlockInputStreamWithCheckingPmemCache(
+ ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException {
+ String cachePath = cacheManager.getReplicaCachePath(
+ b.getBlockPoolId(), b.getBlockId());
+ if (cachePath != null) {
+ return FsDatasetUtil.getInputStreamAndSeek(
+ new File(cachePath), seekOffset);
+ }
+ return info.getDataInputStream(seekOffset);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 8a3b237..92c0888 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -27,6 +27,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import com.google.common.base.Preconditions;
@@ -116,6 +119,19 @@ public class FsDatasetUtil {
}
}
+ public static InputStream getInputStreamAndSeek(File file, long offset)
+ throws IOException {
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ raf.seek(offset);
+ return Channels.newInputStream(raf.getChannel());
+ } catch(IOException ioe) {
+ IOUtils.cleanupWithLogger(null, raf);
+ throw ioe;
+ }
+ }
+
/**
* Find the meta-file for the specified block file and then return the
* generation stamp from the name of the meta-file. Generally meta file will
@@ -183,4 +199,15 @@ public class FsDatasetUtil {
FsDatasetImpl.computeChecksum(wrapper, dstMeta, smallBufferSize, conf);
}
+
+ public static void deleteMappedFile(String filePath) throws IOException {
+ if (filePath == null) {
+ throw new IOException("The filePath should not be null!");
+ }
+ boolean result = Files.deleteIfExists(Paths.get(filePath));
+ if (!result) {
+ throw new IOException(
+ "Failed to delete the mapped file: " + filePath);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 0f5ec2d..a9e9610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -35,6 +35,11 @@ import java.nio.channels.FileChannel;
public abstract class MappableBlockLoader {
/**
+ * Initialize a specific MappableBlockLoader.
+ */
+ abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+
+ /**
* Load the block.
*
* Map the block, and then verify its checksum.
@@ -60,24 +65,48 @@ public abstract class MappableBlockLoader {
/**
* Try to reserve some given bytes.
*
- * @param bytesCount
- * The number of bytes to add.
+ * @param bytesCount The number of bytes to add.
*
- * @return The new number of usedBytes if we succeeded; -1 if we failed.
+ * @return The new number of usedBytes if we succeeded;
+ * -1 if we failed.
*/
abstract long reserve(long bytesCount);
/**
* Release some bytes that we're using.
*
- * @param bytesCount
- * The number of bytes to release.
+ * @param bytesCount The number of bytes to release.
*
- * @return The new number of usedBytes.
+ * @return The new number of usedBytes.
*/
abstract long release(long bytesCount);
/**
+ * Get the config key of cache capacity.
+ */
+ abstract String getCacheCapacityConfigKey();
+
+ /**
+ * Get the approximate amount of cache space used.
+ */
+ abstract long getCacheUsed();
+
+ /**
+ * Get the maximum amount of cache bytes.
+ */
+ abstract long getCacheCapacity();
+
+ /**
+ * Check whether the cache is non-volatile.
+ */
+ abstract boolean isTransientCache();
+
+ /**
+ * Get a cache file path if applicable. Otherwise return null.
+ */
+ abstract String getCachedPath(ExtendedBlockId key);
+
+ /**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
*/
protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index d93193e..4b7af19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
@@ -42,16 +43,11 @@ import java.nio.channels.FileChannel;
@InterfaceStability.Unstable
public class MemoryMappableBlockLoader extends MappableBlockLoader {
- private final FsDatasetCache cacheManager;
+ private MemoryCacheStats memCacheStats;
- /**
- * Constructs memory mappable loader.
- *
- * @param cacheManager
- * FsDatasetCache reference.
- */
- MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
- this.cacheManager = cacheManager;
+ @Override
+ void initialize(FsDatasetCache cacheManager) throws IOException {
+ this.memCacheStats = cacheManager.getMemCacheStats();
}
/**
@@ -72,7 +68,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
* @return The Mappable block.
*/
@Override
- public MappableBlock load(long length, FileInputStream blockIn,
+ MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName,
ExtendedBlockId key)
throws IOException {
@@ -153,12 +149,37 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
}
@Override
+ public String getCacheCapacityConfigKey() {
+ return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+ }
+
+ @Override
+ public long getCacheUsed() {
+ return memCacheStats.getCacheUsed();
+ }
+
+ @Override
+ public long getCacheCapacity() {
+ return memCacheStats.getCacheCapacity();
+ }
+
+ @Override
long reserve(long bytesCount) {
- return cacheManager.reserve(bytesCount);
+ return memCacheStats.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
- return cacheManager.release(bytesCount);
+ return memCacheStats.release(bytesCount);
+ }
+
+ @Override
+ public boolean isTransientCache() {
+ return true;
+ }
+
+ @Override
+ public String getCachedPath(ExtendedBlockId key) {
+ return null;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
similarity index 55%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index d93193e..c581d31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,46 +18,58 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
- * Maps block to memory.
+ * Maps block to persistent memory by using mapped byte buffer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MemoryMappableBlockLoader extends MappableBlockLoader {
+public class PmemMappableBlockLoader extends MappableBlockLoader {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PmemMappableBlockLoader.class);
+ private PmemVolumeManager pmemVolumeManager;
- private final FsDatasetCache cacheManager;
+ @Override
+ void initialize(FsDatasetCache cacheManager) throws IOException {
+ DNConf dnConf = cacheManager.getDnConf();
+ this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
+ dnConf.getPmemVolumes());
+ }
- /**
- * Constructs memory mappable loader.
- *
- * @param cacheManager
- * FsDatasetCache reference.
- */
- MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
- this.cacheManager = cacheManager;
+ @VisibleForTesting
+ PmemVolumeManager getPmemVolumeManager() {
+ return pmemVolumeManager;
}
/**
* Load the block.
*
- * mmap and mlock the block, and then verify its checksum.
+ * Map the block and verify its checksum.
+ *
+ * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+ * is a persistent memory volume selected by getOneLocation() method.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@@ -67,43 +79,63 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
* @param blockFileName The block file name, for logging purposes.
* @param key The extended block ID.
*
- * @throws IOException If mapping block to memory fails or checksum fails.
-
+ * @throws IOException If mapping block fails or checksum fails.
+ *
* @return The Mappable block.
*/
@Override
- public MappableBlock load(long length, FileInputStream blockIn,
+ MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName,
ExtendedBlockId key)
throws IOException {
- MemoryMappedBlock mappableBlock = null;
- MappedByteBuffer mmap = null;
+ PmemMappedBlock mappableBlock = null;
+ String filePath = null;
+
FileChannel blockChannel = null;
+ RandomAccessFile file = null;
+ MappedByteBuffer out = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
- mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
- NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
- verifyChecksum(length, metaIn, blockChannel, blockFileName);
- mappableBlock = new MemoryMappedBlock(mmap, length);
+
+ Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
+ filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+ file = new RandomAccessFile(filePath, "rw");
+ out = file.getChannel().
+ map(FileChannel.MapMode.READ_WRITE, 0, length);
+ if (out == null) {
+ throw new IOException("Failed to map the block " + blockFileName +
+ " to persistent storage.");
+ }
+ verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
+ blockFileName);
+ mappableBlock = new PmemMappedBlock(
+ length, volumeIndex, key, pmemVolumeManager);
+ pmemVolumeManager.afterCache(key, volumeIndex);
+ LOG.info("Successfully cached one replica:{} into persistent memory"
+ + ", [cached path={}, length={}]", key, filePath, length);
} finally {
IOUtils.closeQuietly(blockChannel);
+ if (out != null) {
+ NativeIO.POSIX.munmap(out);
+ }
+ IOUtils.closeQuietly(file);
if (mappableBlock == null) {
- if (mmap != null) {
- NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
- }
+ FsDatasetUtil.deleteMappedFile(filePath);
}
}
return mappableBlock;
}
/**
- * Verifies the block's checksum. This is an I/O intensive operation.
+ * Verifies the block's checksum meanwhile maps block to persistent memory.
+ * This is an I/O intensive operation.
*/
- private void verifyChecksum(long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
+ private void verifyChecksumAndMapBlock(
+ MappedByteBuffer out, long length, FileInputStream metaIn,
+ FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
@@ -115,8 +147,8 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
- throw new IOException(
- "Block InputStream meta file has no FileChannel.");
+ throw new IOException("Cannot get FileChannel from " +
+ "Block InputStream meta file.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -132,7 +164,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
- throw new IOException("checksum verification failed: premature EOF");
+ throw new IOException(
+ "Checksum verification failed for the block " + blockFileName +
+ ": premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
@@ -142,23 +176,55 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
- // Success
+
+ // / Copy data to persistent file
+ out.put(blockBuf);
+ // positioning the
bytesVerified += bytesRead;
+
+ // Clear buffer
blockBuf.clear();
checksumBuf.clear();
}
+ // Forces to write data to storage device containing the mapped file
+ out.force();
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
@Override
+ public String getCacheCapacityConfigKey() {
+ return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+ }
+
+ @Override
+ public long getCacheUsed() {
+ return pmemVolumeManager.getCacheUsed();
+ }
+
+ @Override
+ public long getCacheCapacity() {
+ return pmemVolumeManager.getCacheCapacity();
+ }
+
+ @Override
long reserve(long bytesCount) {
- return cacheManager.reserve(bytesCount);
+ return pmemVolumeManager.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
- return cacheManager.release(bytesCount);
+ return pmemVolumeManager.release(bytesCount);
+ }
+
+ @Override
+ public boolean isTransientCache() {
+ return false;
+ }
+
+ @Override
+ public String getCachedPath(ExtendedBlockId key) {
+ return pmemVolumeManager.getCacheFilePath(key);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
new file mode 100644
index 0000000..ce4fa22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Represents an HDFS block that is mapped to persistent memory by DataNode
+ * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemMappedBlock implements MappableBlock {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PmemMappedBlock.class);
+ private final PmemVolumeManager pmemVolumeManager;
+ private long length;
+ private Byte volumeIndex = null;
+ private ExtendedBlockId key;
+
+ PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
+ PmemVolumeManager pmemVolumeManager) {
+ assert length > 0;
+ this.length = length;
+ this.volumeIndex = volumeIndex;
+ this.key = key;
+ this.pmemVolumeManager = pmemVolumeManager;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public void close() {
+ String cacheFilePath =
+ pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+ try {
+ FsDatasetUtil.deleteMappedFile(cacheFilePath);
+ pmemVolumeManager.afterUncache(key);
+ LOG.info("Successfully uncached one replica:{} from persistent memory"
+ + ", [cached path={}, length={}]", key, cacheFilePath, length);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
new file mode 100644
index 0000000..76aa2dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manage the persistent memory volumes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemVolumeManager {
+
+ /**
+ * Counts used bytes for persistent memory.
+ */
+ private class UsedBytesCount {
+ private final AtomicLong usedBytes = new AtomicLong(0);
+
+ /**
+ * Try to reserve more bytes.
+ *
+ * @param bytesCount The number of bytes to add.
+ *
+ * @return The new number of usedBytes if we succeeded;
+ * -1 if we failed.
+ */
+ long reserve(long bytesCount) {
+ while (true) {
+ long cur = usedBytes.get();
+ long next = cur + bytesCount;
+ if (next > cacheCapacity) {
+ return -1;
+ }
+ if (usedBytes.compareAndSet(cur, next)) {
+ return next;
+ }
+ }
+ }
+
+ /**
+ * Release some bytes that we're using.
+ *
+ * @param bytesCount The number of bytes to release.
+ *
+ * @return The new number of usedBytes.
+ */
+ long release(long bytesCount) {
+ return usedBytes.addAndGet(-bytesCount);
+ }
+
+ long get() {
+ return usedBytes.get();
+ }
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PmemVolumeManager.class);
+ private final ArrayList<String> pmemVolumes = new ArrayList<>();
+ // Maintain which pmem volume a block is cached to.
+ private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
+ new ConcurrentHashMap<>();
+ private final UsedBytesCount usedBytesCount;
+
+ /**
+ * The total cache capacity in bytes of persistent memory.
+ * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
+ */
+ private final long cacheCapacity;
+ private int count = 0;
+ // Strict atomic operation is not guaranteed for the performance sake.
+ private int i = 0;
+
+ PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
+ throws IOException {
+ if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
+ throw new IOException("The persistent memory volume, " +
+ DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
+ " is not configured!");
+ }
+ this.loadVolumes(pmemVolumesConfigured);
+ this.usedBytesCount = new UsedBytesCount();
+ this.cacheCapacity = maxBytes;
+ }
+
+ public long getCacheUsed() {
+ return usedBytesCount.get();
+ }
+
+ public long getCacheCapacity() {
+ return cacheCapacity;
+ }
+
+ /**
+ * Try to reserve more bytes on persistent memory.
+ *
+ * @param bytesCount The number of bytes to add.
+ *
+ * @return The new number of usedBytes if we succeeded;
+ * -1 if we failed.
+ */
+ long reserve(long bytesCount) {
+ return usedBytesCount.reserve(bytesCount);
+ }
+
+ /**
+ * Release some bytes that we're using on persistent memory.
+ *
+ * @param bytesCount The number of bytes to release.
+ *
+ * @return The new number of usedBytes.
+ */
+ long release(long bytesCount) {
+ return usedBytesCount.release(bytesCount);
+ }
+
+ /**
+ * Load and verify the configured pmem volumes.
+ *
+ * @throws IOException If there is no available pmem volume.
+ */
+ private void loadVolumes(String[] volumes) throws IOException {
+ // Check whether the volume exists
+ for (String volume: volumes) {
+ try {
+ File pmemDir = new File(volume);
+ verifyIfValidPmemVolume(pmemDir);
+ // Remove all files under the volume.
+ FileUtils.cleanDirectory(pmemDir);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to parse persistent memory volume " + volume, e);
+ continue;
+ } catch (IOException e) {
+ LOG.error("Bad persistent memory volume: " + volume, e);
+ continue;
+ }
+ pmemVolumes.add(volume);
+ LOG.info("Added persistent memory - " + volume);
+ }
+ count = pmemVolumes.size();
+ if (count == 0) {
+ throw new IOException(
+ "At least one valid persistent memory volume is required!");
+ }
+ }
+
+ @VisibleForTesting
+ static void verifyIfValidPmemVolume(File pmemDir)
+ throws IOException {
+ if (!pmemDir.exists()) {
+ final String message = pmemDir + " does not exist";
+ throw new IOException(message);
+ }
+
+ if (!pmemDir.isDirectory()) {
+ final String message = pmemDir + " is not a directory";
+ throw new IllegalArgumentException(message);
+ }
+
+ String uuidStr = UUID.randomUUID().toString();
+ String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
+ byte[] contents = uuidStr.getBytes("UTF-8");
+ RandomAccessFile testFile = null;
+ MappedByteBuffer out = null;
+ try {
+ testFile = new RandomAccessFile(testFilePath, "rw");
+ out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
+ contents.length);
+ if (out == null) {
+ throw new IOException("Failed to map the test file under " + pmemDir);
+ }
+ out.put(contents);
+ // Forces to write data to storage device containing the mapped file
+ out.force();
+ } catch (IOException e) {
+ throw new IOException(
+ "Exception while writing data to persistent storage dir: " +
+ pmemDir, e);
+ } finally {
+ if (out != null) {
+ out.clear();
+ }
+ if (testFile != null) {
+ IOUtils.closeQuietly(testFile);
+ NativeIO.POSIX.munmap(out);
+ try {
+ FsDatasetUtil.deleteMappedFile(testFilePath);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete test file " + testFilePath +
+ " from persistent memory", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Choose a persistent memory volume based on a specific algorithm.
+ * Currently it is a round-robin policy.
+ *
+ * TODO: Refine volume selection policy by considering storage utilization.
+ */
+ Byte getOneVolumeIndex() throws IOException {
+ if (count != 0) {
+ return (byte)(i++ % count);
+ } else {
+ throw new IOException("No usable persistent memory is found");
+ }
+ }
+
+ @VisibleForTesting
+ String getVolumeByIndex(Byte index) {
+ return pmemVolumes.get(index);
+ }
+
+ /**
+ * The cache file is named as BlockPoolId-BlockId.
+ * So its name can be inferred by BlockPoolId and BlockId.
+ */
+ public String getCacheFileName(ExtendedBlockId key) {
+ return key.getBlockPoolId() + "-" + key.getBlockId();
+ }
+
+ /**
+ * Considering the pmem volume size is below TB level currently,
+ * it is tolerable to keep cache files under one directory.
+ * The strategy will be optimized, especially if one pmem volume
+ * has huge cache capacity.
+ *
+ * @param volumeIndex The index of pmem volume where a replica will be
+ * cached to or has been cached to.
+ *
+ * @param key The replica's ExtendedBlockId.
+ *
+ * @return A path to which the block replica is mapped.
+ */
+ public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
+ return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
+ }
+
+ /**
+ * The cache file path is pmemVolume/BlockPoolId-BlockId.
+ */
+ public String getCacheFilePath(ExtendedBlockId key) {
+ Byte volumeIndex = blockKeyToVolume.get(key);
+ if (volumeIndex == null) {
+ return null;
+ }
+ return inferCacheFilePath(volumeIndex, key);
+ }
+
+ @VisibleForTesting
+ Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
+ return blockKeyToVolume;
+ }
+
+ /**
+ * Add cached block's ExtendedBlockId and its cache volume index to a map
+ * after cache.
+ */
+ public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
+ blockKeyToVolume.put(key, volumeIndex);
+ }
+
+ /**
+ * Remove the record in blockKeyToVolume for uncached block after uncache.
+ */
+ public void afterUncache(ExtendedBlockId key) {
+ blockKeyToVolume.remove(key);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index e9b35be..8a8b55d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2527,6 +2527,18 @@
</property>
<property>
+ <name>dfs.datanode.cache.loader.class</name>
+ <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
+ <description>
+ Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
+ By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
+ PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
+ implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
+ memory directory.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.max.locked.memory</name>
<value>0</value>
<description>
@@ -2544,6 +2556,28 @@
</property>
<property>
+ <name>dfs.datanode.cache.pmem.capacity</name>
+ <value>0</value>
+ <description>
+ The amount of persistent memory in bytes that can be used to cache block
+ replicas to persistent memory. Currently, persistent memory is only enabled
+ in HDFS Centralized Cache Management feature.
+
+ By default, this parameter is 0, which disables persistent memory caching.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.cache.pmem.dirs</name>
+ <value></value>
+ <description>
+ This value specifies the persistent memory directory used for caching block
+ replica. It matters only if the value of dfs.datanode.cache.loader.class is
+ PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.list.cache.directives.num.responses</name>
<value>100</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
new file mode 100644
index 0000000..9b4f06f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import com.google.common.base.Supplier;
+import com.google.common.primitives.Ints;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
+
+/**
+ * Tests HDFS persistent memory cache by PmemMappableBlockLoader.
+ *
+ * Bogus persistent memory volume is used to cache blocks.
+ */
+public class TestCacheByPmemMappableBlockLoader {
+ protected static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
+
+ protected static final long CACHE_CAPACITY = 64 * 1024;
+ protected static final long BLOCK_SIZE = 4 * 1024;
+
+ private static Configuration conf;
+ private static MiniDFSCluster cluster = null;
+ private static DistributedFileSystem fs;
+ private static DataNode dn;
+ private static FsDatasetCache cacheManager;
+ private static PmemMappableBlockLoader cacheLoader;
+ /**
+ * Used to pause DN BPServiceActor threads. BPSA threads acquire the
+ * shared read lock. The test acquires the write lock for exclusive access.
+ */
+ private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private static CacheManipulator prevCacheManipulator;
+ private static DataNodeFaultInjector oldInjector;
+
+ private static final String PMEM_DIR_0 =
+ MiniDFSCluster.getBaseDirectory() + "pmem0";
+ private static final String PMEM_DIR_1 =
+ MiniDFSCluster.getBaseDirectory() + "pmem1";
+
+ static {
+ GenericTestUtils.setLogLevel(
+ LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ oldInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+ @Override
+ public void startOfferService() throws Exception {
+ lock.readLock().lock();
+ }
+
+ @Override
+ public void endOfferService() throws Exception {
+ lock.readLock().unlock();
+ }
+ });
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ DataNodeFaultInjector.set(oldInjector);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new HdfsConfiguration();
+ conf.setLong(
+ DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+ conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ CACHE_CAPACITY);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
+
+ // Configuration for pmem cache
+ conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
+ "org.apache.hadoop.hdfs.server.datanode." +
+ "fsdataset.impl.PmemMappableBlockLoader");
+ new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
+ new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
+ // Configure two bogus pmem volumes
+ conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
+ conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
+
+ prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+ NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ cluster.waitActive();
+
+ fs = cluster.getFileSystem();
+ dn = cluster.getDataNodes().get(0);
+ cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
+ cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+ }
+
+ protected static void shutdownCluster() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test
+ public void testPmemVolumeManager() throws IOException {
+ PmemVolumeManager pmemVolumeManager =
+ cacheLoader.getPmemVolumeManager();
+ assertNotNull(pmemVolumeManager);
+ assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
+ // Test round-robin selection policy
+ long count1 = 0, count2 = 0;
+ for (int i = 0; i < 10; i++) {
+ Byte index = pmemVolumeManager.getOneVolumeIndex();
+ String volume = pmemVolumeManager.getVolumeByIndex(index);
+ if (volume.equals(PMEM_DIR_0)) {
+ count1++;
+ } else if (volume.equals(PMEM_DIR_1)) {
+ count2++;
+ } else {
+ fail("Unexpected persistent storage location:" + volume);
+ }
+ }
+ assertEquals(count1, count2);
+ }
+
+ public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
+ throws IOException {
+ List<ExtendedBlockId> keys = new ArrayList<>();
+ HdfsBlockLocation[] locs =
+ (HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
+ for (HdfsBlockLocation loc : locs) {
+ long bkid = loc.getLocatedBlock().getBlock().getBlockId();
+ String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
+ keys.add(new ExtendedBlockId(bkid, bpid));
+ }
+ return keys;
+ }
+
+ @Test(timeout = 60000)
+ public void testCacheAndUncache() throws Exception {
+ final int maxCacheBlocksNum =
+ Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
+ BlockReaderTestUtil.enableHdfsCachingTracing();
+ Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
+ assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
+
+ final Path testFile = new Path("/testFile");
+ final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
+ DFSTestUtil.createFile(fs, testFile,
+ testFileLen, (short) 1, 0xbeef);
+ List<ExtendedBlockId> blockKeys =
+ getExtendedBlockId(testFile, testFileLen);
+ fs.addCachePool(new CachePoolInfo("testPool"));
+ final long cacheDirectiveId = fs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().setPool("testPool").
+ setPath(testFile).setReplication((short) 1).build());
+ // wait for caching
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+ long blocksCached =
+ MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+ if (blocksCached != maxCacheBlocksNum) {
+ LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
+ "be cached. Right now " + blocksCached + " blocks are cached.");
+ return false;
+ }
+ LOG.info(maxCacheBlocksNum + " blocks are now cached.");
+ return true;
+ }
+ }, 1000, 30000);
+
+ // The pmem cache space is expected to have been used up.
+ assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
+ Map<ExtendedBlockId, Byte> blockKeyToVolume =
+ cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
+ // All block keys should be kept in blockKeyToVolume
+ assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+ assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+ // Test each replica's cache file path
+ for (ExtendedBlockId key : blockKeys) {
+ String cachePath = cacheManager.
+ getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
+ // The cachePath shouldn't be null if the replica has been cached
+ // to pmem.
+ assertNotNull(cachePath);
+ String expectFileName =
+ cacheLoader.getPmemVolumeManager().getCacheFileName(key);
+ if (cachePath.startsWith(PMEM_DIR_0)) {
+ assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
+ } else if (cachePath.startsWith(PMEM_DIR_1)) {
+ assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
+ } else {
+ fail("The cache path is not the expected one: " + cachePath);
+ }
+ }
+
+ // Try to cache another file. Caching this file should fail
+ // due to lack of available cache space.
+ final Path smallTestFile = new Path("/smallTestFile");
+ final long smallTestFileLen = BLOCK_SIZE;
+ DFSTestUtil.createFile(fs, smallTestFile,
+ smallTestFileLen, (short) 1, 0xbeef);
+ // Try to cache more blocks when no cache space is available.
+ final long smallFileCacheDirectiveId = fs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().setPool("testPool").
+ setPath(smallTestFile).setReplication((short) 1).build());
+
+ // Wait for enough time to verify smallTestFile could not be cached.
+ Thread.sleep(10000);
+ MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+ long blocksCached =
+ MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+ // The cached block num should not be increased.
+ assertTrue(blocksCached == maxCacheBlocksNum);
+ // The blockKeyToVolume should just keep the block keys for the testFile.
+ assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+ assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+ // Stop trying to cache smallTestFile to avoid interfering the
+ // verification for uncache functionality.
+ fs.removeCacheDirective(smallFileCacheDirectiveId);
+
+ // Uncache the test file
+ fs.removeCacheDirective(cacheDirectiveId);
+ // Wait for uncaching
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+ long blocksUncached =
+ MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+ if (blocksUncached != maxCacheBlocksNum) {
+ LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
+ "uncached. Right now " + blocksUncached +
+ " blocks are uncached.");
+ return false;
+ }
+ LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
+ return true;
+ }
+ }, 1000, 30000);
+
+ // It is expected that no pmem cache space is used.
+ assertEquals(0, cacheManager.getPmemCacheUsed());
+ // No record should be kept by blockKeyToVolume after testFile is uncached.
+ assertEquals(blockKeyToVolume.size(), 0);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org