You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2019/10/27 17:09:01 UTC

[hadoop] 05/10: HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f982f9662322dbe32e1d5a703f0fabc8720815e2
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Sun May 26 14:30:11 2019 +0530

    HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.
    
    (cherry picked from commit 37900c5639f8ba8d41b9fedc3d41ee0fbda7d5db)
---
 .../fsdataset/impl/MappableBlockLoader.java        |  59 +++++++++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  59 -----------
 .../fsdataset/impl/PmemMappableBlockLoader.java    | 110 +++------------------
 3 files changed, 75 insertions(+), 153 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 044e5c5..3ec8416 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.util.DataChecksum;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -108,6 +114,59 @@ public abstract class MappableBlockLoader {
   }
 
   /**
+   * Verifies the block's checksum. This is an I/O intensive operation.
+   */
+  protected void verifyChecksum(long length, FileInputStream metaIn,
+                                FileChannel blockChannel, String blockFileName)
+      throws IOException {
+    // Verify the checksum from the block's meta file
+    // Get the DataChecksum from the meta file header
+    BlockMetadataHeader header =
+        BlockMetadataHeader.readHeader(new DataInputStream(
+            new BufferedInputStream(metaIn, BlockMetadataHeader
+                .getHeaderSize())));
+    FileChannel metaChannel = null;
+    try {
+      metaChannel = metaIn.getChannel();
+      if (metaChannel == null) {
+        throw new IOException(
+            "Block InputStream meta file has no FileChannel.");
+      }
+      DataChecksum checksum = header.getChecksum();
+      final int bytesPerChecksum = checksum.getBytesPerChecksum();
+      final int checksumSize = checksum.getChecksumSize();
+      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
+      // Verify the checksum
+      int bytesVerified = 0;
+      while (bytesVerified < length) {
+        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+            "Unexpected partial chunk before EOF");
+        assert bytesVerified % bytesPerChecksum == 0;
+        int bytesRead = fillBuffer(blockChannel, blockBuf);
+        if (bytesRead == -1) {
+          throw new IOException("checksum verification failed: premature EOF");
+        }
+        blockBuf.flip();
+        // Number of read chunks, including partial chunk at end
+        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+        checksumBuf.limit(chunks * checksumSize);
+        fillBuffer(metaChannel, checksumBuf);
+        checksumBuf.flip();
+        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
+            bytesVerified);
+        // Success
+        bytesVerified += bytesRead;
+        blockBuf.clear();
+        checksumBuf.clear();
+      }
+    } finally {
+      IOUtils.closeQuietly(metaChannel);
+    }
+  }
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 919835a..52d8d93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -18,22 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
@@ -98,59 +92,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
     return mappableBlock;
   }
 
-  /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
-   */
-  private void verifyChecksum(long length, FileInputStream metaIn,
-                             FileChannel blockChannel, String blockFileName)
-      throws IOException {
-    // Verify the checksum from the block's meta file
-    // Get the DataChecksum from the meta file header
-    BlockMetadataHeader header =
-        BlockMetadataHeader.readHeader(new DataInputStream(
-            new BufferedInputStream(metaIn, BlockMetadataHeader
-                .getHeaderSize())));
-    FileChannel metaChannel = null;
-    try {
-      metaChannel = metaIn.getChannel();
-      if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
-      }
-      DataChecksum checksum = header.getChecksum();
-      final int bytesPerChecksum = checksum.getBytesPerChecksum();
-      final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
-      // Verify the checksum
-      int bytesVerified = 0;
-      while (bytesVerified < length) {
-        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
-        assert bytesVerified % bytesPerChecksum == 0;
-        int bytesRead = fillBuffer(blockChannel, blockBuf);
-        if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
-        }
-        blockBuf.flip();
-        // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
-        checksumBuf.limit(chunks * checksumSize);
-        fillBuffer(metaChannel, checksumBuf);
-        checksumBuf.flip();
-        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-            bytesVerified);
-        // Success
-        bytesVerified += bytesRead;
-        blockBuf.clear();
-        checksumBuf.clear();
-      }
-    } finally {
-      IOUtils.closeQuietly(metaChannel);
-    }
-  }
-
   @Override
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 05a9ba7..239fff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,25 +18,17 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
@@ -79,112 +71,42 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                            FileInputStream metaIn, String blockFileName,
-                            ExtendedBlockId key)
+                     FileInputStream metaIn, String blockFileName,
+                     ExtendedBlockId key)
       throws IOException {
     PmemMappedBlock mappableBlock = null;
-    String filePath = null;
+    String cachePath = null;
 
     FileChannel blockChannel = null;
-    RandomAccessFile file = null;
-    MappedByteBuffer out = null;
+    RandomAccessFile cacheFile = null;
     try {
       blockChannel = blockIn.getChannel();
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
+      cachePath = pmemVolumeManager.getCachePath(key);
+      cacheFile = new RandomAccessFile(cachePath, "rw");
+      blockChannel.transferTo(0, length, cacheFile.getChannel());
+
+      // Verify checksum for the cached data instead of block file.
+      // The file channel should be repositioned.
+      cacheFile.getChannel().position(0);
+      verifyChecksum(length, metaIn, cacheFile.getChannel(), blockFileName);
 
-      filePath = pmemVolumeManager.getCachePath(key);
-      file = new RandomAccessFile(filePath, "rw");
-      out = file.getChannel().
-          map(FileChannel.MapMode.READ_WRITE, 0, length);
-      if (out == null) {
-        throw new IOException("Failed to map the block " + blockFileName +
-            " to persistent storage.");
-      }
-      verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
-          blockFileName);
       mappableBlock = new PmemMappedBlock(length, key);
       LOG.info("Successfully cached one replica:{} into persistent memory"
-          + ", [cached path={}, length={}]", key, filePath, length);
+          + ", [cached path={}, length={}]", key, cachePath, length);
     } finally {
       IOUtils.closeQuietly(blockChannel);
-      if (out != null) {
-        NativeIO.POSIX.munmap(out);
-      }
-      IOUtils.closeQuietly(file);
+      IOUtils.closeQuietly(cacheFile);
       if (mappableBlock == null) {
-        LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
-        FsDatasetUtil.deleteMappedFile(filePath);
+        LOG.debug("Delete {} due to unsuccessful mapping.", cachePath);
+        FsDatasetUtil.deleteMappedFile(cachePath);
       }
     }
     return mappableBlock;
   }
 
-  /**
-   * Verifies the block's checksum meanwhile maps block to persistent memory.
-   * This is an I/O intensive operation.
-   */
-  private void verifyChecksumAndMapBlock(
-      MappedByteBuffer out, long length, FileInputStream metaIn,
-      FileChannel blockChannel, String blockFileName)
-      throws IOException {
-    // Verify the checksum from the block's meta file
-    // Get the DataChecksum from the meta file header
-    BlockMetadataHeader header =
-        BlockMetadataHeader.readHeader(new DataInputStream(
-            new BufferedInputStream(metaIn, BlockMetadataHeader
-                .getHeaderSize())));
-    FileChannel metaChannel = null;
-    try {
-      metaChannel = metaIn.getChannel();
-      if (metaChannel == null) {
-        throw new IOException("Cannot get FileChannel from " +
-            "Block InputStream meta file.");
-      }
-      DataChecksum checksum = header.getChecksum();
-      final int bytesPerChecksum = checksum.getBytesPerChecksum();
-      final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
-      // Verify the checksum
-      int bytesVerified = 0;
-      while (bytesVerified < length) {
-        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
-        assert bytesVerified % bytesPerChecksum == 0;
-        int bytesRead = fillBuffer(blockChannel, blockBuf);
-        if (bytesRead == -1) {
-          throw new IOException(
-              "Checksum verification failed for the block " + blockFileName +
-                  ": premature EOF");
-        }
-        blockBuf.flip();
-        // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
-        checksumBuf.limit(chunks * checksumSize);
-        fillBuffer(metaChannel, checksumBuf);
-        checksumBuf.flip();
-        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-            bytesVerified);
-
-        // / Copy data to persistent file
-        out.put(blockBuf);
-        // positioning the
-        bytesVerified += bytesRead;
-
-        // Clear buffer
-        blockBuf.clear();
-        checksumBuf.clear();
-      }
-      // Forces to write data to storage device containing the mapped file
-      out.force();
-    } finally {
-      IOUtils.closeQuietly(metaChannel);
-    }
-  }
-
   @Override
   public long getCacheUsed() {
     return pmemVolumeManager.getCacheUsed();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org