You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2019/03/15 05:47:43 UTC
[hadoop] branch trunk updated: HDFS-14354: Refactor MappableBlock
to align with the implementation of SCM cache. Contributed by Feilong He.
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba50a36 HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He.
ba50a36 is described below
commit ba50a36a3ead628c3d44d384f7ed4d2b3a55dd07
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu Mar 14 22:21:08 2019 -0700
HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He.
---
.../datanode/fsdataset/impl/FsDatasetCache.java | 15 +-
.../datanode/fsdataset/impl/MappableBlock.java | 155 +--------------------
.../fsdataset/impl/MappableBlockLoader.java | 80 +++++++++++
...leBlock.java => MemoryMappableBlockLoader.java} | 111 +++++----------
.../datanode/fsdataset/impl/MemoryMappedBlock.java | 54 +++++++
5 files changed, 184 insertions(+), 231 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 767b150..9efd11a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -227,6 +227,8 @@ public class FsDatasetCache {
*/
private final long maxBytes;
+ private final MappableBlockLoader mappableBlockLoader;
+
/**
* Number of cache commands that could not be completed successfully
*/
@@ -236,7 +238,7 @@ public class FsDatasetCache {
*/
final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
- public FsDatasetCache(FsDatasetImpl dataset) {
+ public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.dataset = dataset;
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
ThreadFactory workerFactory = new ThreadFactoryBuilder()
@@ -268,6 +270,7 @@ public class FsDatasetCache {
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
+ this.mappableBlockLoader = new MemoryMappableBlockLoader();
}
/**
@@ -461,14 +464,14 @@ public class FsDatasetCache {
return;
}
try {
- mappableBlock = MappableBlock.
- load(length, blockIn, metaIn, blockFileName);
+ mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
+ blockFileName, key);
} catch (ChecksumException e) {
// Exception message is bogus since this wasn't caused by a file read
LOG.warn("Failed to cache " + key + ": checksum verification failed.");
return;
} catch (IOException e) {
- LOG.warn("Failed to cache " + key, e);
+ LOG.warn("Failed to cache the block [key=" + key + "]!", e);
return;
}
synchronized (FsDatasetCache.this) {
@@ -498,9 +501,7 @@ public class FsDatasetCache {
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, usedBytesCount.get());
- if (mappableBlock != null) {
- mappableBlock.close();
- }
+ IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet();
synchronized (FsDatasetCache.this) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 45aa364..0fff327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -18,164 +18,21 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
/**
- * Represents an HDFS block that is mmapped by the DataNode.
+ * Represents an HDFS block that is mapped by the DataNode.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MappableBlock implements Closeable {
- private MappedByteBuffer mmap;
- private final long length;
-
- MappableBlock(MappedByteBuffer mmap, long length) {
- this.mmap = mmap;
- this.length = length;
- assert length > 0;
- }
-
- public long getLength() {
- return length;
- }
-
- /**
- * Load the block.
- *
- * mmap and mlock the block, and then verify its checksum.
- *
- * @param length The current length of the block.
- * @param blockIn The block input stream. Should be positioned at the
- * start. The caller must close this.
- * @param metaIn The meta file input stream. Should be positioned at
- * the start. The caller must close this.
- * @param blockFileName The block file name, for logging purposes.
- *
- * @return The Mappable block.
- */
- public static MappableBlock load(long length,
- FileInputStream blockIn, FileInputStream metaIn,
- String blockFileName) throws IOException {
- MappableBlock mappableBlock = null;
- MappedByteBuffer mmap = null;
- FileChannel blockChannel = null;
- try {
- blockChannel = blockIn.getChannel();
- if (blockChannel == null) {
- throw new IOException("Block InputStream has no FileChannel.");
- }
- mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
- NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
- verifyChecksum(length, metaIn, blockChannel, blockFileName);
- mappableBlock = new MappableBlock(mmap, length);
- } finally {
- IOUtils.closeQuietly(blockChannel);
- if (mappableBlock == null) {
- if (mmap != null) {
- NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
- }
- }
- }
- return mappableBlock;
- }
-
- /**
- * Verifies the block's checksum. This is an I/O intensive operation.
- */
- private static void verifyChecksum(long length,
- FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
- throws IOException, ChecksumException {
- // Verify the checksum from the block's meta file
- // Get the DataChecksum from the meta file header
- BlockMetadataHeader header =
- BlockMetadataHeader.readHeader(new DataInputStream(
- new BufferedInputStream(metaIn, BlockMetadataHeader
- .getHeaderSize())));
- FileChannel metaChannel = null;
- try {
- metaChannel = metaIn.getChannel();
- if (metaChannel == null) {
- throw new IOException("Block InputStream meta file has no FileChannel.");
- }
- DataChecksum checksum = header.getChecksum();
- final int bytesPerChecksum = checksum.getBytesPerChecksum();
- final int checksumSize = checksum.getChecksumSize();
- final int numChunks = (8*1024*1024) / bytesPerChecksum;
- ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
- ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
- // Verify the checksum
- int bytesVerified = 0;
- while (bytesVerified < length) {
- Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
- "Unexpected partial chunk before EOF");
- assert bytesVerified % bytesPerChecksum == 0;
- int bytesRead = fillBuffer(blockChannel, blockBuf);
- if (bytesRead == -1) {
- throw new IOException("checksum verification failed: premature EOF");
- }
- blockBuf.flip();
- // Number of read chunks, including partial chunk at end
- int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
- checksumBuf.limit(chunks*checksumSize);
- fillBuffer(metaChannel, checksumBuf);
- checksumBuf.flip();
- checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
- bytesVerified);
- // Success
- bytesVerified += bytesRead;
- blockBuf.clear();
- checksumBuf.clear();
- }
- } finally {
- IOUtils.closeQuietly(metaChannel);
- }
- }
+public interface MappableBlock extends Closeable {
/**
- * Reads bytes into a buffer until EOF or the buffer's limit is reached
+ * Get the number of bytes that have been cached.
+ * @return the number of bytes that have been cached.
*/
- private static int fillBuffer(FileChannel channel, ByteBuffer buf)
- throws IOException {
- int bytesRead = channel.read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = channel.read(buf);
- if (n < 0) {
- //EOF
- return bytesRead;
- }
- bytesRead += n;
- }
- return bytesRead;
- }
-
- @Override
- public void close() {
- if (mmap != null) {
- NativeIO.POSIX.munmap(mmap);
- mmap = null;
- }
- }
+ long getLength();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
new file mode 100644
index 0000000..a323f78
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Maps block to DataNode cache region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class MappableBlockLoader {
+
+ /**
+ * Load the block.
+ *
+ * Map the block, and then verify its checksum.
+ *
+ * @param length The current length of the block.
+ * @param blockIn The block input stream. Should be positioned at the
+ * start. The caller must close this.
+ * @param metaIn The meta file input stream. Should be positioned at
+ * the start. The caller must close this.
+ * @param blockFileName The block file name, for logging purposes.
+ * @param key The extended block ID.
+ *
+ * @throws IOException If mapping block to cache region fails or checksum
+ * fails.
+ *
+ * @return The Mappable block.
+ */
+ abstract MappableBlock load(long length, FileInputStream blockIn,
+ FileInputStream metaIn, String blockFileName,
+ ExtendedBlockId key)
+ throws IOException;
+
+ /**
+ * Reads bytes into a buffer until EOF or the buffer's limit is reached.
+ */
+ protected int fillBuffer(FileChannel channel, ByteBuffer buf)
+ throws IOException {
+ int bytesRead = channel.read(buf);
+ if (bytesRead < 0) {
+ //EOF
+ return bytesRead;
+ }
+ while (buf.remaining() > 0) {
+ int n = channel.read(buf);
+ if (n < 0) {
+ //EOF
+ return bytesRead;
+ }
+ bytesRead += n;
+ }
+ return bytesRead;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
similarity index 64%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 45aa364..7a0f7c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -18,45 +18,29 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
/**
- * Represents an HDFS block that is mmapped by the DataNode.
+ * Maps block to memory.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MappableBlock implements Closeable {
- private MappedByteBuffer mmap;
- private final long length;
-
- MappableBlock(MappedByteBuffer mmap, long length) {
- this.mmap = mmap;
- this.length = length;
- assert length > 0;
- }
-
- public long getLength() {
- return length;
- }
+public class MemoryMappableBlockLoader extends MappableBlockLoader {
/**
* Load the block.
@@ -64,18 +48,23 @@ public class MappableBlock implements Closeable {
* mmap and mlock the block, and then verify its checksum.
*
* @param length The current length of the block.
- * @param blockIn The block input stream. Should be positioned at the
- * start. The caller must close this.
- * @param metaIn The meta file input stream. Should be positioned at
- * the start. The caller must close this.
+ * @param blockIn The block input stream. Should be positioned at the
+ * start. The caller must close this.
+ * @param metaIn The meta file input stream. Should be positioned at
+ * the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
+ * @param key The extended block ID.
*
+ * @throws IOException If mapping block to memory fails or checksum fails.
+
* @return The Mappable block.
*/
- public static MappableBlock load(long length,
- FileInputStream blockIn, FileInputStream metaIn,
- String blockFileName) throws IOException {
- MappableBlock mappableBlock = null;
+ @Override
+ public MappableBlock load(long length, FileInputStream blockIn,
+ FileInputStream metaIn, String blockFileName,
+ ExtendedBlockId key)
+ throws IOException {
+ MemoryMappedBlock mappableBlock = null;
MappedByteBuffer mmap = null;
FileChannel blockChannel = null;
try {
@@ -83,10 +72,10 @@ public class MappableBlock implements Closeable {
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
- mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
+ mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
- mappableBlock = new MappableBlock(mmap, length);
+ mappableBlock = new MemoryMappedBlock(mmap, length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) {
@@ -101,9 +90,9 @@ public class MappableBlock implements Closeable {
/**
* Verifies the block's checksum. This is an I/O intensive operation.
*/
- private static void verifyChecksum(long length,
- FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
- throws IOException, ChecksumException {
+ public void verifyChecksum(long length, FileInputStream metaIn,
+ FileChannel blockChannel, String blockFileName)
+ throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
@@ -114,14 +103,15 @@ public class MappableBlock implements Closeable {
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
- throw new IOException("Block InputStream meta file has no FileChannel.");
+ throw new IOException(
+ "Block InputStream meta file has no FileChannel.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
- final int numChunks = (8*1024*1024) / bytesPerChecksum;
- ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
- ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
+ final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+ ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+ ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
@@ -134,8 +124,8 @@ public class MappableBlock implements Closeable {
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
- int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
- checksumBuf.limit(chunks*checksumSize);
+ int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
@@ -149,33 +139,4 @@ public class MappableBlock implements Closeable {
IOUtils.closeQuietly(metaChannel);
}
}
-
- /**
- * Reads bytes into a buffer until EOF or the buffer's limit is reached
- */
- private static int fillBuffer(FileChannel channel, ByteBuffer buf)
- throws IOException {
- int bytesRead = channel.read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = channel.read(buf);
- if (n < 0) {
- //EOF
- return bytesRead;
- }
- bytesRead += n;
- }
- return bytesRead;
- }
-
- @Override
- public void close() {
- if (mmap != null) {
- NativeIO.POSIX.munmap(mmap);
- mmap = null;
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
new file mode 100644
index 0000000..c09ad1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.nio.MappedByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+/**
+ * Represents an HDFS block that is mapped to memory by the DataNode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MemoryMappedBlock implements MappableBlock {
+ private MappedByteBuffer mmap;
+ private final long length;
+
+ MemoryMappedBlock(MappedByteBuffer mmap, long length) {
+ this.mmap = mmap;
+ this.length = length;
+ assert length > 0;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public void close() {
+ if (mmap != null) {
+ NativeIO.POSIX.munmap(mmap);
+ mmap = null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org