You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/09/17 16:35:07 UTC

[hadoop] branch branch-2 updated: HDFS-14771. Backport HDFS-14617 to branch-2 (Improve fsimage load time by writing sub-sections to the fsimage index). Contributed by He Xiaoqiao and Stephen O'Donnel.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6fe8261  HDFS-14771. Backport HDFS-14617 to branch-2 (Improve fsimage load time by writing sub-sections to the fsimage index). Contributed by He Xiaoqiao and Stephen O'Donnel.
6fe8261 is described below

commit 6fe8261fa0f760db0f34fdc01786fdec99f491c4
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Tue Sep 17 09:32:58 2019 -0700

    HDFS-14771. Backport HDFS-14617 to branch-2 (Improve fsimage load time by writing sub-sections to the fsimage index). Contributed by He Xiaoqiao and Stephen O'Donnel.
    
    Reviewed-by: Stephen O'Donnell <so...@cloudera.com>
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  16 ++
 .../hadoop/hdfs/server/namenode/FSImage.java       |   3 +-
 .../hdfs/server/namenode/FSImageFormatPBINode.java | 244 ++++++++++++++++++--
 .../server/namenode/FSImageFormatProtobuf.java     | 252 ++++++++++++++++++++-
 .../namenode/snapshot/FSImageFormatPBSnapshot.java |   9 +-
 .../src/main/resources/hdfs-default.xml            |  56 +++++
 .../hdfs/server/namenode/FSImageTestUtil.java      |  25 +-
 .../hadoop/hdfs/server/namenode/TestFSImage.java   | 152 +++++++++++++
 .../server/namenode/TestFSImageWithSnapshot.java   |   3 +-
 9 files changed, 729 insertions(+), 31 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 23198ac..56b0e9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -707,6 +707,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
   public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
 
+  public static final String DFS_IMAGE_PARALLEL_LOAD_KEY =
+      "dfs.image.parallel.load";
+  public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = false;
+
+  public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY =
+      "dfs.image.parallel.target.sections";
+  public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12;
+
+  public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY =
+      "dfs.image.parallel.inode.threshold";
+  public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000;
+
+  public static final String DFS_IMAGE_PARALLEL_THREADS_KEY =
+      "dfs.image.parallel.threads";
+  public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4;
+
   public static final String DFS_QJM_OPERATIONS_TIMEOUT =
       "dfs.qjm.operations.timeout";
   public static final long DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT = 60000;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 26ec48f..86cc6a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -982,7 +982,8 @@ public class FSImage implements Closeable {
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
     File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
     
-    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
+    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
+        conf);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     long numErrors = saver.save(newFile, compression);
     if (numErrors > 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index deb697d..1e03799 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -25,6 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,6 +75,8 @@ import com.google.protobuf.ByteString;
 public final class FSImageFormatPBINode {
   private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class);
 
+  private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000;
+
   public static final int ACL_ENTRY_NAME_MASK = (1 << 24) - 1;
   public static final int ACL_ENTRY_NAME_OFFSET = 6;
   public static final int ACL_ENTRY_TYPE_OFFSET = 3;
@@ -192,16 +199,69 @@ public final class FSImageFormatPBINode {
     private final FSDirectory dir;
     private final FSNamesystem fsn;
     private final FSImageFormatProtobuf.Loader parent;
+    private ReentrantLock cacheNameMapLock;
+    private ReentrantLock blockMapLock;
 
     Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
       this.fsn = fsn;
       this.dir = fsn.dir;
       this.parent = parent;
+      cacheNameMapLock = new ReentrantLock(true);
+      blockMapLock = new ReentrantLock(true);
+    }
+
+    void loadINodeDirectorySectionInParallel(ExecutorService service,
+        ArrayList<FileSummary.Section> sections, final String compressionCodec)
+        throws IOException {
+      LOG.info("Loading the INodeDirectory section in parallel with "
+          + sections.size() + " sub-sections");
+      final CountDownLatch latch = new CountDownLatch(sections.size());
+      final CopyOnWriteArrayList<IOException> exceptions =
+          new CopyOnWriteArrayList<>();
+      for (final FileSummary.Section s : sections) {
+        service.submit(new Runnable() {
+          @Override
+          public void run() {
+            InputStream ins = null;
+            try {
+              ins = parent.getInputStreamForSection(s,
+                  compressionCodec);
+              Loader.this.loadINodeDirectorySection(ins);
+            } catch (Exception e) {
+              LOG.error("An exception occurred loading INodeDirectories in " +
+                  "parallel", e);
+              exceptions.add(new IOException(e));
+            } finally {
+              latch.countDown();
+              try {
+                if (ins != null) {
+                  ins.close();
+                }
+              } catch (IOException ioe) {
+                LOG.warn("Failed to close the input stream, ignoring", ioe);
+              }
+            }
+          }
+        });
+      }
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted waiting for countdown latch", e);
+        throw new IOException(e);
+      }
+      if (exceptions.size() != 0) {
+        LOG.error(exceptions.size() + " exceptions occurred loading "
+            + "INodeDirectories");
+        throw exceptions.get(0);
+      }
+      LOG.info("Completed loading all INodeDirectory sub-sections");
     }
 
     void loadINodeDirectorySection(InputStream in) throws IOException {
       final List<INodeReference> refList = parent.getLoaderContext()
           .getRefList();
+      ArrayList<INode> inodeList = new ArrayList<>();
       while (true) {
         INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
             .parseDelimitedFrom(in);
@@ -212,33 +272,161 @@ public final class FSImageFormatPBINode {
         INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
           INode child = dir.getInode(id);
-          addToParent(p, child);
+          if (addToParent(p, child)) {
+            if (child.isFile()) {
+              inodeList.add(child);
+            }
+            if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+              addToCacheAndBlockMap(inodeList);
+              inodeList.clear();
+            }
+          } else {
+            LOG.warn("Failed to add the inode " + child.getId()
+                + "to the directory " + p.getId());
+          }
         }
         for (int refId : e.getRefChildrenList()) {
           INodeReference ref = refList.get(refId);
-          addToParent(p, ref);
+          if (addToParent(p, ref)) {
+            if (ref.isFile()) {
+              inodeList.add(ref);
+            }
+            if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+              addToCacheAndBlockMap(inodeList);
+              inodeList.clear();
+            }
+          } else {
+            LOG.warn("Failed to add the inode reference " + ref.getId()
+                + " to the directory " + p.getId());
+          }
+        }
+      }
+      addToCacheAndBlockMap(inodeList);
+    }
+
+    private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
+      try {
+        cacheNameMapLock.lock();
+        for (INode i : inodeList) {
+          dir.cacheName(i);
+        }
+      } finally {
+        cacheNameMapLock.unlock();
+      }
+
+      try {
+        blockMapLock.lock();
+        for (INode i : inodeList) {
+          updateBlocksMap(i.asFile(), fsn.getBlockManager());
         }
+      } finally {
+        blockMapLock.unlock();
       }
     }
 
     void loadINodeSection(InputStream in, StartupProgress prog,
         Step currentStep) throws IOException {
-      INodeSection s = INodeSection.parseDelimitedFrom(in);
-      fsn.dir.resetLastInodeId(s.getLastInodeId());
-      long numInodes = s.getNumInodes();
-      LOG.info("Loading " + numInodes + " INodes.");
-      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+      loadINodeSectionHeader(in, prog, currentStep);
       Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-      for (int i = 0; i < numInodes; ++i) {
+      int totalLoaded = loadINodesInSection(in, counter);
+      LOG.info("Successfully loaded " + totalLoaded + " inodes");
+    }
+
+    private int loadINodesInSection(InputStream in, Counter counter)
+        throws IOException {
+      // As the input stream is a LimitInputStream, the reading will stop when
+      // EOF is encountered at the end of the stream.
+      int cntr = 0;
+      while (true) {
         INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+        if (p == null) {
+          break;
+        }
         if (p.getId() == INodeId.ROOT_INODE_ID) {
-          loadRootINode(p);
+          synchronized(this) {
+            loadRootINode(p);
+          }
         } else {
           INode n = loadINode(p);
-          dir.addToInodeMap(n);
+          synchronized(this) {
+            dir.addToInodeMap(n);
+          }
+        }
+        cntr++;
+        if (counter != null) {
+          counter.increment();
         }
-        counter.increment();
       }
+      return cntr;
+    }
+
+    private long loadINodeSectionHeader(InputStream in, StartupProgress prog,
+        Step currentStep) throws IOException {
+      INodeSection s = INodeSection.parseDelimitedFrom(in);
+      fsn.dir.resetLastInodeId(s.getLastInodeId());
+      long numInodes = s.getNumInodes();
+      LOG.info("Loading " + numInodes + " INodes.");
+      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+      return numInodes;
+    }
+
+    void loadINodeSectionInParallel(ExecutorService service,
+        ArrayList<FileSummary.Section> sections,
+        String compressionCodec, final StartupProgress prog,
+        final Step currentStep) throws IOException {
+      LOG.info("Loading the INode section in parallel with "
+          + sections.size() + " sub-sections");
+      long expectedInodes = 0;
+      final CountDownLatch latch = new CountDownLatch(sections.size());
+      final AtomicInteger totalLoaded = new AtomicInteger(0);
+      final CopyOnWriteArrayList<IOException> exceptions =
+          new CopyOnWriteArrayList<>();
+
+      for (int i=0; i < sections.size(); i++) {
+        FileSummary.Section s = sections.get(i);
+        final InputStream ins = parent.getInputStreamForSection(s,
+            compressionCodec);
+        if (i == 0) {
+          // The first inode section has a header which must be processed first
+          expectedInodes = loadINodeSectionHeader(ins, prog, currentStep);
+        }
+        service.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              totalLoaded.addAndGet(Loader.this.loadINodesInSection(ins, null));
+              prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
+                  totalLoaded.get());
+            } catch (Exception e) {
+              LOG.error("An exception occurred loading INodes in parallel", e);
+              exceptions.add(new IOException(e));
+            } finally {
+              latch.countDown();
+              try {
+                ins.close();
+              } catch (IOException ioe) {
+                LOG.warn("Failed to close the input stream, ignoring", ioe);
+              }
+            }
+          }
+        });
+      }
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted waiting for countdown latch");
+      }
+      if (exceptions.size() != 0) {
+        LOG.error(exceptions.size() + " exceptions occurred loading INodes");
+        throw exceptions.get(0);
+      }
+      if (totalLoaded.get() != expectedInodes) {
+        throw new IOException("Expected to load "+expectedInodes+" in " +
+            "parallel, but loaded "+totalLoaded.get()+". The image may " +
+            "be corrupt.");
+      }
+      LOG.info("Completed loading all INode sections. Loaded "
+          + totalLoaded.get() +" inodes.");
     }
 
     /**
@@ -256,22 +444,18 @@ public final class FSImageFormatPBINode {
       }
     }
 
-    private void addToParent(INodeDirectory parent, INode child) {
-      if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
+    private boolean addToParent(INodeDirectory parentDir, INode child) {
+      if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) {
         throw new HadoopIllegalArgumentException("File name \""
             + child.getLocalName() + "\" is reserved. Please "
             + " change the name of the existing file or directory to another "
             + "name before upgrading to this release.");
       }
       // NOTE: This does not update space counts for parents
-      if (!parent.addChild(child)) {
-        return;
-      }
-      dir.cacheName(child);
-
-      if (child.isFile()) {
-        updateBlocksMap(child.asFile(), fsn.getBlockManager());
+      if (!parentDir.addChild(child)) {
+        return false;
       }
+      return true;
     }
 
     private INode loadINode(INodeSection.INode n) {
@@ -488,6 +672,7 @@ public final class FSImageFormatPBINode {
       final ArrayList<INodeReference> refList = parent.getSaverContext()
           .getRefList();
       int i = 0;
+      int outputInodes = 0;
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
         if (!n.isDirectory()) {
@@ -517,6 +702,7 @@ public final class FSImageFormatPBINode {
               refList.add(inode.asReference());
               b.addRefChildren(refList.size() - 1);
             }
+            outputInodes++;
           }
           INodeDirectorySection.DirEntry e = b.build();
           e.writeDelimitedTo(out);
@@ -526,9 +712,15 @@ public final class FSImageFormatPBINode {
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
           context.checkCancelled();
         }
+        if (outputInodes >= parent.getInodesPerSubSection()) {
+          outputInodes = 0;
+          parent.commitSubSection(summary,
+              FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
+        }
       }
-      parent.commitSection(summary,
-          FSImageFormatProtobuf.SectionName.INODE_DIR);
+      parent.commitSectionAndSubSection(summary,
+          FSImageFormatProtobuf.SectionName.INODE_DIR,
+          FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
     }
 
     void serializeINodeSection(OutputStream out) throws IOException {
@@ -548,8 +740,14 @@ public final class FSImageFormatPBINode {
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
           context.checkCancelled();
         }
+        if (i % parent.getInodesPerSubSection() == 0) {
+          parent.commitSubSection(summary,
+              FSImageFormatProtobuf.SectionName.INODE_SUB);
+        }
       }
-      parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
+      parent.commitSectionAndSubSection(summary,
+          FSImageFormatProtobuf.SectionName.INODE,
+          FSImageFormatProtobuf.SectionName.INODE_SUB);
     }
 
     void serializeFilesUCSection(OutputStream out) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
index 040f131..a492a1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -36,10 +36,14 @@ import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,6 +149,8 @@ public final class FSImageFormatProtobuf {
      */
     private final boolean requireSameLayoutVersion;
 
+    private File filename;
+
     Loader(Configuration conf, FSNamesystem fsn,
         boolean requireSameLayoutVersion) {
       this.conf = conf;
@@ -224,6 +230,7 @@ public final class FSImageFormatProtobuf {
     }
 
     void load(File file) throws IOException {
+      filename = file;
       long start = Time.monotonicNow();
       DigestThread dt = new DigestThread(file);
       dt.start();
@@ -245,6 +252,96 @@ public final class FSImageFormatProtobuf {
       }
     }
 
+    /**
+     * Given a FSImage FileSummary.section, return a LimitInput stream set to
+     * the starting position of the section and limited to the section length.
+     * @param section The FileSummary.Section containing the offset and length
+     * @param compressionCodec The compression codec in use, if any
+     * @return An InputStream for the given section
+     * @throws IOException
+     */
+    public InputStream getInputStreamForSection(FileSummary.Section section,
+                                                String compressionCodec)
+        throws IOException {
+      FileInputStream fin = new FileInputStream(filename);
+      FileChannel channel = fin.getChannel();
+      channel.position(section.getOffset());
+      InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+          section.getLength()));
+
+      in = FSImageUtil.wrapInputStreamForCompression(conf,
+          compressionCodec, in);
+      return in;
+    }
+
+    /**
+     * Takes an ArrayList of Section's and removes all Section's whose
+     * name ends in _SUB, indicating they are sub-sections. The original
+     * array list is modified and a new list of the removed Section's is
+     * returned.
+     * @param sections Array List containing all Sections and Sub Sections
+     *                 in the image.
+     * @return ArrayList of the sections removed, or an empty list if none are
+     *         removed.
+     */
+    private ArrayList<FileSummary.Section> getAndRemoveSubSections(
+        ArrayList<FileSummary.Section> sections) {
+      ArrayList<FileSummary.Section> subSections = new ArrayList<>();
+      Iterator<FileSummary.Section> iter = sections.iterator();
+      while (iter.hasNext()) {
+        FileSummary.Section s = iter.next();
+        String name = s.getName();
+        if (name.matches(".*_SUB$")) {
+          subSections.add(s);
+          iter.remove();
+        }
+      }
+      return subSections;
+    }
+
+    /**
+     * Given an ArrayList of Section's, return all Section's with the given
+     * name, or an empty list if none are found.
+     * @param sections ArrayList of the Section's to search though
+     * @param name The name of the Sections to search for
+     * @return ArrayList of the sections matching the given name
+     */
+    private ArrayList<FileSummary.Section> getSubSectionsOfName(
+        ArrayList<FileSummary.Section> sections, SectionName name) {
+      ArrayList<FileSummary.Section> subSec = new ArrayList<>();
+      for (FileSummary.Section s : sections) {
+        String n = s.getName();
+        SectionName sectionName = SectionName.fromString(n);
+        if (sectionName == name) {
+          subSec.add(s);
+        }
+      }
+      return subSec;
+    }
+
+    /**
+     * Checks the number of threads configured for parallel loading and
+     * return an ExecutorService with configured number of threads. If the
+     * thread count is set to less than 1, it will be reset to the default
+     * value
+     * @return ExecutorServie with the correct number of threads
+     */
+    private ExecutorService getParallelExecutorService() {
+      int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
+          DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
+      if (threads < 1) {
+        LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " +
+            "default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
+            threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
+        threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT;
+      }
+      ExecutorService executorService = Executors.newFixedThreadPool(
+          threads);
+      LOG.info("The fsimage will be loaded in parallel using {} threads",
+          threads);
+      return executorService;
+    }
+
     private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
         throws IOException {
       if (!FSImageUtil.checkFileFormat(raFile)) {
@@ -290,6 +387,15 @@ public final class FSImageFormatProtobuf {
        */
       Step currentStep = null;
 
+      boolean loadInParallel = enableParallelSaveAndLoad(conf);
+
+      ExecutorService executorService = null;
+      ArrayList<FileSummary.Section> subSections =
+          getAndRemoveSubSections(sections);
+      if (loadInParallel) {
+        executorService = getParallelExecutorService();
+      }
+
       for (FileSummary.Section s : sections) {
         channel.position(s.getOffset());
         InputStream in = new BufferedInputStream(new LimitInputStream(fin,
@@ -300,6 +406,7 @@ public final class FSImageFormatProtobuf {
 
         String n = s.getName();
 
+        ArrayList<FileSummary.Section> stageSubSections;
         switch (SectionName.fromString(n)) {
         case NS_INFO:
           loadNameSystemSection(in);
@@ -310,14 +417,28 @@ public final class FSImageFormatProtobuf {
         case INODE: {
           currentStep = new Step(StepType.INODES);
           prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
-          inodeLoader.loadINodeSection(in, prog, currentStep);
+          stageSubSections = getSubSectionsOfName(
+              subSections, SectionName.INODE_SUB);
+          if (loadInParallel && (stageSubSections.size() > 0)) {
+            inodeLoader.loadINodeSectionInParallel(executorService,
+                stageSubSections, summary.getCodec(), prog, currentStep);
+          } else {
+            inodeLoader.loadINodeSection(in, prog, currentStep);
+          }
         }
           break;
         case INODE_REFERENCE:
           snapshotLoader.loadINodeReferenceSection(in);
           break;
         case INODE_DIR:
-          inodeLoader.loadINodeDirectorySection(in);
+          stageSubSections = getSubSectionsOfName(
+              subSections, SectionName.INODE_DIR_SUB);
+          if (loadInParallel && stageSubSections.size() > 0) {
+            inodeLoader.loadINodeDirectorySectionInParallel(executorService,
+                stageSubSections, summary.getCodec());
+          } else {
+            inodeLoader.loadINodeDirectorySection(in);
+          }
           break;
         case FILES_UNDERCONSTRUCTION:
           inodeLoader.loadFilesUnderConstructionSection(in);
@@ -348,6 +469,9 @@ public final class FSImageFormatProtobuf {
           break;
         }
       }
+      if (executorService != null) {
+        executorService.shutdown();
+      }
     }
 
     private void loadNameSystemSection(InputStream in) throws IOException {
@@ -421,12 +545,34 @@ public final class FSImageFormatProtobuf {
 
   }
 
+  private static boolean enableParallelSaveAndLoad(Configuration conf) {
+    boolean loadInParallel =
+        conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
+            DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
+    boolean compressionEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+        DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
+    if (loadInParallel) {
+      if (compressionEnabled) {
+        LOG.warn("Parallel Image loading and saving is not supported when {}" +
+                " is set to true. Parallel will be disabled.",
+            DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
+        loadInParallel = false;
+      }
+    }
+    return loadInParallel;
+  }
+
   public static final class Saver {
     public static final int CHECK_CANCEL_INTERVAL = 4096;
+    private boolean writeSubSections = false;
+    private int inodesPerSubSection = Integer.MAX_VALUE;
 
     private final SaveNamespaceContext context;
     private final SaverContext saverContext;
     private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
+    private long subSectionOffset = currentOffset;
     private MD5Hash savedDigest;
 
     private FileChannel fileChannel;
@@ -434,10 +580,12 @@ public final class FSImageFormatProtobuf {
     private OutputStream sectionOutputStream;
     private CompressionCodec codec;
     private OutputStream underlyingOutputStream;
+    private Configuration conf;
 
-    Saver(SaveNamespaceContext context) {
+    Saver(SaveNamespaceContext context, Configuration conf) {
       this.context = context;
       this.saverContext = new SaverContext();
+      this.conf = conf;
     }
 
     public MD5Hash getSavedDigest() {
@@ -452,6 +600,29 @@ public final class FSImageFormatProtobuf {
       return saverContext;
     }
 
+    public int getInodesPerSubSection() {
+      return inodesPerSubSection;
+    }
+
+    public boolean shouldWriteSubSections() {
+      return writeSubSections;
+    }
+
+    /**
+     * Commit the length and offset of a fsimage section to the summary index,
+     * including the sub section, which will be committed before the section is
+     * committed.
+     * @param summary The image summary object
+     * @param name The name of the section to commit
+     * @param subSectionName The name of the sub-section to commit
+     * @throws IOException
+     */
+    public void commitSectionAndSubSection(FileSummary.Builder summary,
+        SectionName name, SectionName subSectionName) throws IOException {
+      commitSubSection(summary, subSectionName);
+      commitSection(summary, name);
+    }
+
     public void commitSection(FileSummary.Builder summary, SectionName name)
         throws IOException {
       long oldOffset = currentOffset;
@@ -466,6 +637,35 @@ public final class FSImageFormatProtobuf {
       summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
           .setLength(length).setOffset(currentOffset));
       currentOffset += length;
+      subSectionOffset = currentOffset;
+    }
+
+    /**
+     * Commit the length and offset of a fsimage sub-section to the summary
+     * index.
+     * @param summary The image summary object
+     * @param name The name of the sub-section to commit
+     * @throws IOException
+     */
+    public void commitSubSection(FileSummary.Builder summary, SectionName name)
+        throws IOException {
+      if (!writeSubSections) {
+        return;
+      }
+
+      LOG.debug("Saving a subsection for {}", name.toString());
+      // The output stream must be flushed before the length is obtained
+      // as the flush can move the length forward.
+      sectionOutputStream.flush();
+      long length = fileChannel.position() - subSectionOffset;
+      if (length == 0) {
+        LOG.warn("The requested section for {} is empty. It will not be " +
+            "output to the image", name.toString());
+        return;
+      }
+      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
+          .setLength(length).setOffset(subSectionOffset));
+      subSectionOffset += length;
     }
 
     private void flushSectionOutputStream() throws IOException {
@@ -480,6 +680,7 @@ public final class FSImageFormatProtobuf {
      * @throws IOException on fatal error.
      */
     long save(File file, FSImageCompression compression) throws IOException {
+      enableSubSectionsIfRequired();
       FileOutputStream fout = new FileOutputStream(file);
       fileChannel = fout.getChannel();
       try {
@@ -496,6 +697,47 @@ public final class FSImageFormatProtobuf {
       }
     }
 
+    private void enableSubSectionsIfRequired() {
+      boolean parallelEnabled = enableParallelSaveAndLoad(conf);
+      int inodeThreshold = conf.getInt(
+          DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+          DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+      int targetSections = conf.getInt(
+          DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+          DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+
+      if (parallelEnabled) {
+        if (targetSections <= 0) {
+          LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
+                  " default of {}",
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+              targetSections,
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+          targetSections =
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT;
+        }
+        if (inodeThreshold <= 0) {
+          LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
+                  " default of {}",
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+              inodeThreshold,
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+          inodeThreshold =
+              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT;
+        }
+        int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize();
+        // Only enable parallel sections if there are enough inodes
+        if (inodeCount >= inodeThreshold) {
+          writeSubSections = true;
+          // Calculate the inodes per section rounded up to the nearest int
+          inodesPerSubSection = (inodeCount + targetSections - 1) /
+              targetSections;
+        }
+      } else {
+        writeSubSections = false;
+      }
+    }
+
     private static void saveFileSummary(OutputStream out, FileSummary summary)
         throws IOException {
       summary.writeDelimitedTo(out);
@@ -683,11 +925,15 @@ public final class FSImageFormatProtobuf {
     STRING_TABLE("STRING_TABLE"),
     EXTENDED_ACL("EXTENDED_ACL"),
     INODE("INODE"),
+    INODE_SUB("INODE_SUB"),
     INODE_REFERENCE("INODE_REFERENCE"),
+    INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"),
     SNAPSHOT("SNAPSHOT"),
     INODE_DIR("INODE_DIR"),
+    INODE_DIR_SUB("INODE_DIR_SUB"),
     FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
     SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
+    SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"),
     SECRET_MANAGER("SECRET_MANAGER"),
     CACHE_MANAGER("CACHE_MANAGER");
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index deedbf6..d15c13c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -520,9 +520,14 @@ public class FSImageFormatPBSnapshot {
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
           context.checkCancelled();
         }
+        if (i % parent.getInodesPerSubSection() == 0) {
+          parent.commitSubSection(headers,
+              FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
+        }
       }
-      parent.commitSection(headers,
-          FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF);
+      parent.commitSectionAndSubSection(headers,
+          FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF,
+          FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
     }
 
     private void serializeFileDiffList(INodeFile file, OutputStream out)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fae39ed..2754a42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1206,6 +1206,62 @@
 </property>
 
 <property>
+  <name>dfs.image.parallel.load</name>
+  <value>false</value>
+  <description>
+    If true, write sub-section entries to the fsimage index so it can
+    be loaded in parallel. Also controls whether parallel loading
+    will be used for an image previously created with sub-sections.
+    If the image contains sub-sections and this is set to false,
+    parallel loading will not be used.
+    Parallel loading is not compatible with image compression,
+    so if dfs.image.compress is set to true this setting will be
+    ignored and no parallel loading will occur.
+    Enabling this feature may impact rolling upgrades and downgrades if
+    the previous version does not support this feature. If the feature was
+    enabled and a downgrade is required, first set this parameter to
+    false and then save the namespace to create a fsimage with no
+    sub-sections and then perform the downgrade.
+  </description>
+</property>
+
+<property>
+  <name>dfs.image.parallel.target.sections</name>
+  <value>12</value>
+  <description>
+    Controls the number of sub-sections that will be written to
+    fsimage for each section. This should be larger than
+    dfs.image.parallel.threads, otherwise all threads will not be
+    used when loading. Ideally, have at least twice the number
+    of target sections as threads, so each thread must load more
+    than one section to avoid one long running section affecting
+    the load time.
+  </description>
+</property>
+
+<property>
+  <name>dfs.image.parallel.inode.threshold</name>
+  <value>1000000</value>
+  <description>
+    If the image contains less inodes than this setting, then
+    do not write sub-sections and hence disable parallel loading.
+    This is because small images load very quickly in serial and
+    parallel loading is not needed.
+  </description>
+</property>
+
+<property>
+  <name>dfs.image.parallel.threads</name>
+  <value>4</value>
+  <description>
+    The number of threads to use when dfs.image.parallel.load is
+    enabled. This setting should be less than
+    dfs.image.parallel.target.sections. The optimal number of
+    threads will depend on the hardware and environment.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.delegation.key.update-interval</name>
   <value>86400000</value>
   <description>The update interval for master key for delegation tokens 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
index 5e00792..888a949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
@@ -626,4 +626,27 @@ public abstract class FSImageTestUtil {
         getStorageDirectory(storageUri);
     return NNStorage.readTransactionIdFile(sDir);
   }
-}
+
+  /**
+   * Returns the summary section from the latest fsimage stored on the cluster.
+   * This is effectively the image index which contains the offset of each
+   * section and subsection.
+   * @param cluster The cluster to load the image from
+   * @return The FileSummary section of the fsimage
+   * @throws IOException
+   */
+  public static FsImageProto.FileSummary getLatestImageSummary(
+      MiniDFSCluster cluster) throws IOException {
+    RandomAccessFile raFile = null;
+    try {
+      File image = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
+          .getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
+      raFile = new RandomAccessFile(image, "r");
+      return FSImageUtil.loadSummary(raFile);
+    } finally {
+      if (raFile != null) {
+        raFile.close();
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index cbfc258..62141aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Assume;
 
@@ -46,6 +48,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary.Section;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -314,4 +318,152 @@ public class TestFSImage {
       FileUtil.fullyDelete(dfsDir);
     }
   }
+
+  private ArrayList<Section> getSubSectionsOfName(ArrayList<Section> sections,
+      FSImageFormatProtobuf.SectionName name) {
+    ArrayList<Section> subSec = new ArrayList<>();
+    for (Section s : sections) {
+      if (s.getName().equals(name.toString())) {
+        subSec.add(s);
+      }
+    }
+    return subSec;
+  }
+
+  private MiniDFSCluster createAndLoadParallelFSImage(Configuration conf)
+      throws IOException {
+    conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
+    conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
+    conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
+    conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+
+    // Create 10 directories, each containing 5 files
+    String baseDir = "/abc/def";
+    for (int i=0; i<10; i++) {
+      Path dir = new Path(baseDir+"/"+i);
+      for (int j=0; j<5; j++) {
+        Path f = new Path(dir, Integer.toString(j));
+        FSDataOutputStream os = fs.create(f);
+        os.write(1);
+        os.close();
+      }
+    }
+
+    // checkpoint
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+    cluster.restartNameNode();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+
+    // Ensure all the files created above exist, proving they were loaded
+    // correctly
+    for (int i=0; i<10; i++) {
+      Path dir = new Path(baseDir+"/"+i);
+      assertTrue(fs.getFileStatus(dir).isDirectory());
+      for (int j=0; j<5; j++) {
+        Path f = new Path(dir, Integer.toString(j));
+        assertTrue(fs.exists(f));
+      }
+    }
+    return cluster;
+  }
+
+  @Test
+  public void testParallelSaveAndLoad() throws IOException {
+    Configuration conf = new Configuration();
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = createAndLoadParallelFSImage(conf);
+
+      // Obtain the image summary section to check the sub-sections
+      // are being correctly created when the image is saved.
+      FsImageProto.FileSummary summary = FSImageTestUtil.
+          getLatestImageSummary(cluster);
+      ArrayList<Section> sections = Lists.newArrayList(
+          summary.getSectionsList());
+
+      ArrayList<Section> inodeSubSections =
+          getSubSectionsOfName(sections, SectionName.INODE_SUB);
+      ArrayList<Section> dirSubSections =
+          getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
+      Section inodeSection =
+          getSubSectionsOfName(sections, SectionName.INODE).get(0);
+      Section dirSection = getSubSectionsOfName(sections,
+          SectionName.INODE_DIR).get(0);
+
+      // Expect 4 sub-sections for inodes and directories as target Sections
+      // is 4
+      assertEquals(4, inodeSubSections.size());
+      assertEquals(4, dirSubSections.size());
+
+      // Expect the sub-section offset and lengths do not overlap and cover a
+      // continuous range of the file. They should also line up with the parent
+      ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection);
+      ensureSubSectionsAlignWithParent(dirSubSections, dirSection);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testNoParallelSectionsWithCompressionEnabled()
+      throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+        "org.apache.hadoop.io.compress.GzipCodec");
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = createAndLoadParallelFSImage(conf);
+
+      // Obtain the image summary section to check the sub-sections
+      // are being correctly created when the image is saved.
+      FsImageProto.FileSummary summary = FSImageTestUtil.
+          getLatestImageSummary(cluster);
+      ArrayList<Section> sections = Lists.newArrayList(
+          summary.getSectionsList());
+
+      ArrayList<Section> inodeSubSections =
+          getSubSectionsOfName(sections, SectionName.INODE_SUB);
+      ArrayList<Section> dirSubSections =
+          getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
+
+      // As compression is enabled, there should be no sub-sections in the
+      // image header
+      assertEquals(0, inodeSubSections.size());
+      assertEquals(0, dirSubSections.size());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void ensureSubSectionsAlignWithParent(ArrayList<Section> subSec,
+                                                Section parent) {
+    // For each sub-section, check its offset + length == the next section
+    // offset
+    for (int i=0; i<subSec.size()-1; i++) {
+      Section s = subSec.get(i);
+      long endOffset = s.getOffset() + s.getLength();
+      assertEquals(subSec.get(i+1).getOffset(), endOffset);
+    }
+    // The last sub-section should align with the parent section
+    Section lastSubSection = subSec.get(subSec.size()-1);
+    assertEquals(parent.getLength()+parent.getOffset(),
+        lastSubSection.getLength() + lastSubSection.getOffset());
+    // The first sub-section and parent section should have the same offset
+    assertEquals(parent.getOffset(), subSec.get(0).getOffset());
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
index 56fe483..6059fcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
@@ -142,7 +142,8 @@ public class TestFSImageWithSnapshot {
   private File saveFSImageToTempFile() throws IOException {
     SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid,
         new Canceler());
-    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
+    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
+        conf);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     File imageFile = getImageFile(testDir, txid);
     fsn.readLock();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org