You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2019/10/27 17:09:01 UTC
[hadoop] 05/10: HDFS-14402. Use FileChannel.transferTo() method for
transferring block to SCM cache. Contributed by Feilong He.
This is an automated email from the ASF dual-hosted git repository.
rakeshr pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f982f9662322dbe32e1d5a703f0fabc8720815e2
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Sun May 26 14:30:11 2019 +0530
HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.
(cherry picked from commit 37900c5639f8ba8d41b9fedc3d41ee0fbda7d5db)
---
.../fsdataset/impl/MappableBlockLoader.java | 59 +++++++++++
.../fsdataset/impl/MemoryMappableBlockLoader.java | 59 -----------
.../fsdataset/impl/PmemMappableBlockLoader.java | 110 +++------------------
3 files changed, 75 insertions(+), 153 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 044e5c5..3ec8416 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -18,10 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.util.DataChecksum;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -108,6 +114,59 @@ public abstract class MappableBlockLoader {
}
/**
+ * Verifies the block's checksum. This is an I/O intensive operation.
+ */
+ protected void verifyChecksum(long length, FileInputStream metaIn,
+ FileChannel blockChannel, String blockFileName)
+ throws IOException {
+ // Verify the checksum from the block's meta file
+ // Get the DataChecksum from the meta file header
+ BlockMetadataHeader header =
+ BlockMetadataHeader.readHeader(new DataInputStream(
+ new BufferedInputStream(metaIn, BlockMetadataHeader
+ .getHeaderSize())));
+ FileChannel metaChannel = null;
+ try {
+ metaChannel = metaIn.getChannel();
+ if (metaChannel == null) {
+ throw new IOException(
+ "Block InputStream meta file has no FileChannel.");
+ }
+ DataChecksum checksum = header.getChecksum();
+ final int bytesPerChecksum = checksum.getBytesPerChecksum();
+ final int checksumSize = checksum.getChecksumSize();
+ final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+ ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+ ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
+ // Verify the checksum
+ int bytesVerified = 0;
+ while (bytesVerified < length) {
+ Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+ "Unexpected partial chunk before EOF");
+ assert bytesVerified % bytesPerChecksum == 0;
+ int bytesRead = fillBuffer(blockChannel, blockBuf);
+ if (bytesRead == -1) {
+ throw new IOException("checksum verification failed: premature EOF");
+ }
+ blockBuf.flip();
+ // Number of read chunks, including partial chunk at end
+ int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.limit(chunks * checksumSize);
+ fillBuffer(metaChannel, checksumBuf);
+ checksumBuf.flip();
+ checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
+ bytesVerified);
+ // Success
+ bytesVerified += bytesRead;
+ blockBuf.clear();
+ checksumBuf.clear();
+ }
+ } finally {
+ IOUtils.closeQuietly(metaChannel);
+ }
+ }
+
+ /**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
*/
protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 919835a..52d8d93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -18,22 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -98,59 +92,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
return mappableBlock;
}
- /**
- * Verifies the block's checksum. This is an I/O intensive operation.
- */
- private void verifyChecksum(long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
- throws IOException {
- // Verify the checksum from the block's meta file
- // Get the DataChecksum from the meta file header
- BlockMetadataHeader header =
- BlockMetadataHeader.readHeader(new DataInputStream(
- new BufferedInputStream(metaIn, BlockMetadataHeader
- .getHeaderSize())));
- FileChannel metaChannel = null;
- try {
- metaChannel = metaIn.getChannel();
- if (metaChannel == null) {
- throw new IOException(
- "Block InputStream meta file has no FileChannel.");
- }
- DataChecksum checksum = header.getChecksum();
- final int bytesPerChecksum = checksum.getBytesPerChecksum();
- final int checksumSize = checksum.getChecksumSize();
- final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
- ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
- ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
- // Verify the checksum
- int bytesVerified = 0;
- while (bytesVerified < length) {
- Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
- "Unexpected partial chunk before EOF");
- assert bytesVerified % bytesPerChecksum == 0;
- int bytesRead = fillBuffer(blockChannel, blockBuf);
- if (bytesRead == -1) {
- throw new IOException("checksum verification failed: premature EOF");
- }
- blockBuf.flip();
- // Number of read chunks, including partial chunk at end
- int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
- checksumBuf.limit(chunks * checksumSize);
- fillBuffer(metaChannel, checksumBuf);
- checksumBuf.flip();
- checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
- bytesVerified);
- // Success
- bytesVerified += bytesRead;
- blockBuf.clear();
- checksumBuf.clear();
- }
- } finally {
- IOUtils.closeQuietly(metaChannel);
- }
- }
-
@Override
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 05a9ba7..239fff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,25 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
@@ -79,112 +71,42 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName,
+ ExtendedBlockId key)
throws IOException {
PmemMappedBlock mappableBlock = null;
- String filePath = null;
+ String cachePath = null;
FileChannel blockChannel = null;
- RandomAccessFile file = null;
- MappedByteBuffer out = null;
+ RandomAccessFile cacheFile = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
+ cachePath = pmemVolumeManager.getCachePath(key);
+ cacheFile = new RandomAccessFile(cachePath, "rw");
+ blockChannel.transferTo(0, length, cacheFile.getChannel());
+
+ // Verify checksum for the cached data instead of block file.
+ // The file channel should be repositioned.
+ cacheFile.getChannel().position(0);
+ verifyChecksum(length, metaIn, cacheFile.getChannel(), blockFileName);
- filePath = pmemVolumeManager.getCachePath(key);
- file = new RandomAccessFile(filePath, "rw");
- out = file.getChannel().
- map(FileChannel.MapMode.READ_WRITE, 0, length);
- if (out == null) {
- throw new IOException("Failed to map the block " + blockFileName +
- " to persistent storage.");
- }
- verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
- blockFileName);
mappableBlock = new PmemMappedBlock(length, key);
LOG.info("Successfully cached one replica:{} into persistent memory"
- + ", [cached path={}, length={}]", key, filePath, length);
+ + ", [cached path={}, length={}]", key, cachePath, length);
} finally {
IOUtils.closeQuietly(blockChannel);
- if (out != null) {
- NativeIO.POSIX.munmap(out);
- }
- IOUtils.closeQuietly(file);
+ IOUtils.closeQuietly(cacheFile);
if (mappableBlock == null) {
- LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
- FsDatasetUtil.deleteMappedFile(filePath);
+ LOG.debug("Delete {} due to unsuccessful mapping.", cachePath);
+ FsDatasetUtil.deleteMappedFile(cachePath);
}
}
return mappableBlock;
}
- /**
- * Verifies the block's checksum meanwhile maps block to persistent memory.
- * This is an I/O intensive operation.
- */
- private void verifyChecksumAndMapBlock(
- MappedByteBuffer out, long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
- throws IOException {
- // Verify the checksum from the block's meta file
- // Get the DataChecksum from the meta file header
- BlockMetadataHeader header =
- BlockMetadataHeader.readHeader(new DataInputStream(
- new BufferedInputStream(metaIn, BlockMetadataHeader
- .getHeaderSize())));
- FileChannel metaChannel = null;
- try {
- metaChannel = metaIn.getChannel();
- if (metaChannel == null) {
- throw new IOException("Cannot get FileChannel from " +
- "Block InputStream meta file.");
- }
- DataChecksum checksum = header.getChecksum();
- final int bytesPerChecksum = checksum.getBytesPerChecksum();
- final int checksumSize = checksum.getChecksumSize();
- final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
- ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
- ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
- // Verify the checksum
- int bytesVerified = 0;
- while (bytesVerified < length) {
- Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
- "Unexpected partial chunk before EOF");
- assert bytesVerified % bytesPerChecksum == 0;
- int bytesRead = fillBuffer(blockChannel, blockBuf);
- if (bytesRead == -1) {
- throw new IOException(
- "Checksum verification failed for the block " + blockFileName +
- ": premature EOF");
- }
- blockBuf.flip();
- // Number of read chunks, including partial chunk at end
- int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
- checksumBuf.limit(chunks * checksumSize);
- fillBuffer(metaChannel, checksumBuf);
- checksumBuf.flip();
- checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
- bytesVerified);
-
- // / Copy data to persistent file
- out.put(blockBuf);
- // positioning the
- bytesVerified += bytesRead;
-
- // Clear buffer
- blockBuf.clear();
- checksumBuf.clear();
- }
- // Forces to write data to storage device containing the mapped file
- out.force();
- } finally {
- IOUtils.closeQuietly(metaChannel);
- }
- }
-
@Override
public long getCacheUsed() {
return pmemVolumeManager.getCacheUsed();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org