You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by so...@apache.org on 2020/08/12 08:54:09 UTC

[hadoop] branch branch-3.3 updated: HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 42154f0  HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang
42154f0 is described below

commit 42154f04e33b2f5d15b4629be71047db410b2759
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Wed Aug 12 09:02:47 2020 +0100

    HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang
    
    (cherry picked from commit 10716040a859b7127a4e1781be21c3b0b59dd456)
---
 .../hdfs/server/namenode/FSImageFormatPBINode.java | 119 ++++++++++++++-------
 .../server/namenode/FSImageFormatProtobuf.java     |   1 +
 .../hadoop/hdfs/server/namenode/TestFSImage.java   |  61 +++++++++++
 3 files changed, 142 insertions(+), 39 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 6212e65..22bb905 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -28,8 +28,9 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -204,15 +205,20 @@ public final class FSImageFormatPBINode {
     private final FSDirectory dir;
     private final FSNamesystem fsn;
     private final FSImageFormatProtobuf.Loader parent;
-    private ReentrantLock cacheNameMapLock;
-    private ReentrantLock blockMapLock;
+
+    // Update blocks map by single thread asynchronously
+    private ExecutorService blocksMapUpdateExecutor;
+    // update name cache by single thread asynchronously.
+    private ExecutorService nameCacheUpdateExecutor;
 
     Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
       this.fsn = fsn;
       this.dir = fsn.dir;
       this.parent = parent;
-      cacheNameMapLock = new ReentrantLock(true);
-      blockMapLock = new ReentrantLock(true);
+      // Note: these executors must be SingleThreadExecutor, as they
+      // are used to modify structures which are not thread safe.
+      blocksMapUpdateExecutor = Executors.newSingleThreadExecutor();
+      nameCacheUpdateExecutor = Executors.newSingleThreadExecutor();
     }
 
     void loadINodeDirectorySectionInParallel(ExecutorService service,
@@ -263,7 +269,6 @@ public final class FSImageFormatPBINode {
     void loadINodeDirectorySection(InputStream in) throws IOException {
       final List<INodeReference> refList = parent.getLoaderContext()
           .getRefList();
-      ArrayList<INode> inodeList = new ArrayList<>();
       while (true) {
         INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
             .parseDelimitedFrom(in);
@@ -274,15 +279,7 @@ public final class FSImageFormatPBINode {
         INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
           INode child = dir.getInode(id);
-          if (addToParent(p, child)) {
-            if (child.isFile()) {
-              inodeList.add(child);
-            }
-            if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
-              addToCacheAndBlockMap(inodeList);
-              inodeList.clear();
-            }
-          } else {
+          if (!addToParent(p, child)) {
             LOG.warn("Failed to add the inode {} to the directory {}",
                 child.getId(), p.getId());
           }
@@ -290,40 +287,79 @@ public final class FSImageFormatPBINode {
 
         for (int refId : e.getRefChildrenList()) {
           INodeReference ref = refList.get(refId);
-          if (addToParent(p, ref)) {
-            if (ref.isFile()) {
-              inodeList.add(ref);
-            }
-            if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
-              addToCacheAndBlockMap(inodeList);
-              inodeList.clear();
-            }
-          } else {
+          if (!addToParent(p, ref)) {
             LOG.warn("Failed to add the inode reference {} to the directory {}",
                 ref.getId(), p.getId());
           }
         }
       }
-      addToCacheAndBlockMap(inodeList);
     }
 
-    private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
-      try {
-        cacheNameMapLock.lock();
-        for (INode i : inodeList) {
-          dir.cacheName(i);
-        }
-      } finally {
-        cacheNameMapLock.unlock();
+    private void fillUpInodeList(ArrayList<INode> inodeList, INode inode) {
+      if (inode.isFile()) {
+        inodeList.add(inode);
       }
+      if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+        addToCacheAndBlockMap(inodeList);
+        inodeList.clear();
+      }
+    }
 
-      try {
-        blockMapLock.lock();
-        for (INode i : inodeList) {
-          updateBlocksMap(i.asFile(), fsn.getBlockManager());
+    private void addToCacheAndBlockMap(final ArrayList<INode> inodeList) {
+      final ArrayList<INode> inodes = new ArrayList<>(inodeList);
+      nameCacheUpdateExecutor.submit(
+          new Runnable() {
+            @Override
+            public void run() {
+              addToCacheInternal(inodes);
+            }
+          });
+      blocksMapUpdateExecutor.submit(
+          new Runnable() {
+            @Override
+            public void run() {
+              updateBlockMapInternal(inodes);
+            }
+          });
+    }
+
+    // update name cache with non-thread safe
+    private void addToCacheInternal(ArrayList<INode> inodeList) {
+      for (INode i : inodeList) {
+        dir.cacheName(i);
+      }
+    }
+
+     // update blocks map with non-thread safe
+    private void updateBlockMapInternal(ArrayList<INode> inodeList) {
+      for (INode i : inodeList) {
+        updateBlocksMap(i.asFile(), fsn.getBlockManager());
+      }
+    }
+
+    void waitBlocksMapAndNameCacheUpdateFinished() throws IOException {
+      long start = System.currentTimeMillis();
+      waitExecutorTerminated(blocksMapUpdateExecutor);
+      waitExecutorTerminated(nameCacheUpdateExecutor);
+      LOG.info("Completed update blocks map and name cache, total waiting "
+          + "duration {}ms.", (System.currentTimeMillis() - start));
+    }
+
+    private void waitExecutorTerminated(ExecutorService executorService)
+        throws IOException {
+      executorService.shutdown();
+      long start = System.currentTimeMillis();
+      while (!executorService.isTerminated()) {
+        try {
+          executorService.awaitTermination(1, TimeUnit.SECONDS);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Waiting to executor service terminated duration {}ms.",
+                (System.currentTimeMillis() - start));
+          }
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted waiting for executor terminated.", e);
+          throw new IOException(e);
         }
-      } finally {
-        blockMapLock.unlock();
       }
     }
 
@@ -340,6 +376,7 @@ public final class FSImageFormatPBINode {
       // As the input stream is a LimitInputStream, the reading will stop when
       // EOF is encountered at the end of the stream.
       int cntr = 0;
+      ArrayList<INode> inodeList = new ArrayList<>();
       while (true) {
         INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
         if (p == null) {
@@ -354,12 +391,16 @@ public final class FSImageFormatPBINode {
           synchronized(this) {
             dir.addToInodeMap(n);
           }
+          fillUpInodeList(inodeList, n);
         }
         cntr++;
         if (counter != null) {
           counter.increment();
         }
       }
+      if (inodeList.size() > 0){
+        addToCacheAndBlockMap(inodeList);
+      }
       return cntr;
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
index be21d1f..d34da5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -447,6 +447,7 @@ public final class FSImageFormatProtobuf {
           } else {
             inodeLoader.loadINodeDirectorySection(in);
           }
+          inodeLoader.waitBlocksMapAndNameCacheUpdateFinished();
           break;
         case FILES_UNDERCONSTRUCTION:
           inodeLoader.loadFilesUnderConstructionSection(in);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 793a749..39a0f15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -1152,4 +1154,63 @@ public class TestFSImage {
     // The first sub-section and parent section should have the same offset
     assertEquals(parent.getOffset(), subSec.get(0).getOffset());
   }
+
+  @Test
+  public void testUpdateBlocksMapAndNameCacheAsync() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    FSDirectory fsdir = cluster.getNameNode().namesystem.getFSDirectory();
+    File workingDir = GenericTestUtils.getTestDir();
+
+    File preRestartTree = new File(workingDir, "preRestartTree");
+    File postRestartTree = new File(workingDir, "postRestartTree");
+
+    Path baseDir = new Path("/user/foo");
+    fs.mkdirs(baseDir);
+    fs.allowSnapshot(baseDir);
+    for (int i = 0; i < 5; i++) {
+      Path dir = new Path(baseDir, Integer.toString(i));
+      fs.mkdirs(dir);
+      for (int j = 0; j < 5; j++) {
+        Path file = new Path(dir, Integer.toString(j));
+        FSDataOutputStream os = fs.create(file);
+        os.write((byte) j);
+        os.close();
+      }
+      fs.createSnapshot(baseDir, "snap_"+i);
+      fs.rename(new Path(dir, "0"), new Path(dir, "renamed"));
+    }
+    SnapshotTestHelper.dumpTree2File(fsdir, preRestartTree);
+
+    // checkpoint
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+    cluster.restartNameNode();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fsdir = cluster.getNameNode().namesystem.getFSDirectory();
+
+    // Ensure all the files created above exist, and blocks is correct.
+    for (int i = 0; i < 5; i++) {
+      Path dir = new Path(baseDir, Integer.toString(i));
+      assertTrue(fs.getFileStatus(dir).isDirectory());
+      for (int j = 0; j < 5; j++) {
+        Path file = new Path(dir, Integer.toString(j));
+        if (j == 0) {
+          file = new Path(dir, "renamed");
+        }
+        FSDataInputStream in = fs.open(file);
+        int n = in.readByte();
+        assertEquals(j, n);
+        in.close();
+      }
+    }
+    SnapshotTestHelper.dumpTree2File(fsdir, postRestartTree);
+    SnapshotTestHelper.compareDumpedTreeInFile(
+        preRestartTree, postRestartTree, true);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org