You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by so...@apache.org on 2020/08/12 08:54:09 UTC
[hadoop] branch branch-3.3 updated: HDFS-15493. Update block map
and name cache in parallel while loading fsimage. Contributed by Chengwei
Wang
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 42154f0 HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang
42154f0 is described below
commit 42154f04e33b2f5d15b4629be71047db410b2759
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Wed Aug 12 09:02:47 2020 +0100
HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang
(cherry picked from commit 10716040a859b7127a4e1781be21c3b0b59dd456)
---
.../hdfs/server/namenode/FSImageFormatPBINode.java | 119 ++++++++++++++-------
.../server/namenode/FSImageFormatProtobuf.java | 1 +
.../hadoop/hdfs/server/namenode/TestFSImage.java | 61 +++++++++++
3 files changed, 142 insertions(+), 39 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 6212e65..22bb905 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -28,8 +28,9 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -204,15 +205,20 @@ public final class FSImageFormatPBINode {
private final FSDirectory dir;
private final FSNamesystem fsn;
private final FSImageFormatProtobuf.Loader parent;
- private ReentrantLock cacheNameMapLock;
- private ReentrantLock blockMapLock;
+
+ // Update blocks map by single thread asynchronously
+ private ExecutorService blocksMapUpdateExecutor;
+ // update name cache by single thread asynchronously.
+ private ExecutorService nameCacheUpdateExecutor;
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
this.fsn = fsn;
this.dir = fsn.dir;
this.parent = parent;
- cacheNameMapLock = new ReentrantLock(true);
- blockMapLock = new ReentrantLock(true);
+ // Note: these executors must be SingleThreadExecutor, as they
+ // are used to modify structures which are not thread safe.
+ blocksMapUpdateExecutor = Executors.newSingleThreadExecutor();
+ nameCacheUpdateExecutor = Executors.newSingleThreadExecutor();
}
void loadINodeDirectorySectionInParallel(ExecutorService service,
@@ -263,7 +269,6 @@ public final class FSImageFormatPBINode {
void loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
- ArrayList<INode> inodeList = new ArrayList<>();
while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
@@ -274,15 +279,7 @@ public final class FSImageFormatPBINode {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
- if (addToParent(p, child)) {
- if (child.isFile()) {
- inodeList.add(child);
- }
- if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
- addToCacheAndBlockMap(inodeList);
- inodeList.clear();
- }
- } else {
+ if (!addToParent(p, child)) {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
}
@@ -290,40 +287,79 @@ public final class FSImageFormatPBINode {
for (int refId : e.getRefChildrenList()) {
INodeReference ref = refList.get(refId);
- if (addToParent(p, ref)) {
- if (ref.isFile()) {
- inodeList.add(ref);
- }
- if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
- addToCacheAndBlockMap(inodeList);
- inodeList.clear();
- }
- } else {
+ if (!addToParent(p, ref)) {
LOG.warn("Failed to add the inode reference {} to the directory {}",
ref.getId(), p.getId());
}
}
}
- addToCacheAndBlockMap(inodeList);
}
- private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
- try {
- cacheNameMapLock.lock();
- for (INode i : inodeList) {
- dir.cacheName(i);
- }
- } finally {
- cacheNameMapLock.unlock();
+ private void fillUpInodeList(ArrayList<INode> inodeList, INode inode) {
+ if (inode.isFile()) {
+ inodeList.add(inode);
}
+ if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+ addToCacheAndBlockMap(inodeList);
+ inodeList.clear();
+ }
+ }
- try {
- blockMapLock.lock();
- for (INode i : inodeList) {
- updateBlocksMap(i.asFile(), fsn.getBlockManager());
+ private void addToCacheAndBlockMap(final ArrayList<INode> inodeList) {
+ final ArrayList<INode> inodes = new ArrayList<>(inodeList);
+ nameCacheUpdateExecutor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ addToCacheInternal(inodes);
+ }
+ });
+ blocksMapUpdateExecutor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ updateBlockMapInternal(inodes);
+ }
+ });
+ }
+
+ // update name cache with non-thread safe
+ private void addToCacheInternal(ArrayList<INode> inodeList) {
+ for (INode i : inodeList) {
+ dir.cacheName(i);
+ }
+ }
+
+ // update blocks map with non-thread safe
+ private void updateBlockMapInternal(ArrayList<INode> inodeList) {
+ for (INode i : inodeList) {
+ updateBlocksMap(i.asFile(), fsn.getBlockManager());
+ }
+ }
+
+ void waitBlocksMapAndNameCacheUpdateFinished() throws IOException {
+ long start = System.currentTimeMillis();
+ waitExecutorTerminated(blocksMapUpdateExecutor);
+ waitExecutorTerminated(nameCacheUpdateExecutor);
+ LOG.info("Completed update blocks map and name cache, total waiting "
+ + "duration {}ms.", (System.currentTimeMillis() - start));
+ }
+
+ private void waitExecutorTerminated(ExecutorService executorService)
+ throws IOException {
+ executorService.shutdown();
+ long start = System.currentTimeMillis();
+ while (!executorService.isTerminated()) {
+ try {
+ executorService.awaitTermination(1, TimeUnit.SECONDS);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting to executor service terminated duration {}ms.",
+ (System.currentTimeMillis() - start));
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for executor terminated.", e);
+ throw new IOException(e);
}
- } finally {
- blockMapLock.unlock();
}
}
@@ -340,6 +376,7 @@ public final class FSImageFormatPBINode {
// As the input stream is a LimitInputStream, the reading will stop when
// EOF is encountered at the end of the stream.
int cntr = 0;
+ ArrayList<INode> inodeList = new ArrayList<>();
while (true) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p == null) {
@@ -354,12 +391,16 @@ public final class FSImageFormatPBINode {
synchronized(this) {
dir.addToInodeMap(n);
}
+ fillUpInodeList(inodeList, n);
}
cntr++;
if (counter != null) {
counter.increment();
}
}
+ if (inodeList.size() > 0){
+ addToCacheAndBlockMap(inodeList);
+ }
return cntr;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
index be21d1f..d34da5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -447,6 +447,7 @@ public final class FSImageFormatProtobuf {
} else {
inodeLoader.loadINodeDirectorySection(in);
}
+ inodeLoader.waitBlocksMapAndNameCacheUpdateFinished();
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 793a749..39a0f15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.NativeCodeLoader;
@@ -1152,4 +1154,63 @@ public class TestFSImage {
// The first sub-section and parent section should have the same offset
assertEquals(parent.getOffset(), subSec.get(0).getOffset());
}
+
+ @Test
+ public void testUpdateBlocksMapAndNameCacheAsync() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ FSDirectory fsdir = cluster.getNameNode().namesystem.getFSDirectory();
+ File workingDir = GenericTestUtils.getTestDir();
+
+ File preRestartTree = new File(workingDir, "preRestartTree");
+ File postRestartTree = new File(workingDir, "postRestartTree");
+
+ Path baseDir = new Path("/user/foo");
+ fs.mkdirs(baseDir);
+ fs.allowSnapshot(baseDir);
+ for (int i = 0; i < 5; i++) {
+ Path dir = new Path(baseDir, Integer.toString(i));
+ fs.mkdirs(dir);
+ for (int j = 0; j < 5; j++) {
+ Path file = new Path(dir, Integer.toString(j));
+ FSDataOutputStream os = fs.create(file);
+ os.write((byte) j);
+ os.close();
+ }
+ fs.createSnapshot(baseDir, "snap_"+i);
+ fs.rename(new Path(dir, "0"), new Path(dir, "renamed"));
+ }
+ SnapshotTestHelper.dumpTree2File(fsdir, preRestartTree);
+
+ // checkpoint
+ fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ fs.saveNamespace();
+ fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+ cluster.restartNameNode();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ fsdir = cluster.getNameNode().namesystem.getFSDirectory();
+
+ // Ensure all the files created above exist, and blocks is correct.
+ for (int i = 0; i < 5; i++) {
+ Path dir = new Path(baseDir, Integer.toString(i));
+ assertTrue(fs.getFileStatus(dir).isDirectory());
+ for (int j = 0; j < 5; j++) {
+ Path file = new Path(dir, Integer.toString(j));
+ if (j == 0) {
+ file = new Path(dir, "renamed");
+ }
+ FSDataInputStream in = fs.open(file);
+ int n = in.readByte();
+ assertEquals(j, n);
+ in.close();
+ }
+ }
+ SnapshotTestHelper.dumpTree2File(fsdir, postRestartTree);
+ SnapshotTestHelper.compareDumpedTreeInFile(
+ preRestartTree, postRestartTree, true);
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org