You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/23 00:10:07 UTC
[hadoop] branch trunk updated: HDFS-14617. Improve fsimage load
time by writing sub-sections to the fsimage index (#1028). Contributed by
Stephen O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new b67812e HDFS-14617. Improve fsimage load time by writing sub-sections to the fsimage index (#1028). Contributed by Stephen O'Donnell.
b67812e is described below
commit b67812ea2111fa11bdd76096b923c93e1bdf2923
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Aug 23 01:09:57 2019 +0100
HDFS-14617. Improve fsimage load time by writing sub-sections to the fsimage index (#1028). Contributed by Stephen O'Donnell.
Reviewed-by: He Xiaoqiao <he...@apache.org>
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 16 ++
.../hadoop/hdfs/server/namenode/FSImage.java | 3 +-
.../hdfs/server/namenode/FSImageFormatPBINode.java | 239 +++++++++++++++++--
.../server/namenode/FSImageFormatProtobuf.java | 252 ++++++++++++++++++++-
.../namenode/snapshot/FSImageFormatPBSnapshot.java | 9 +-
.../src/main/resources/hdfs-default.xml | 51 +++++
.../hdfs/server/namenode/FSImageTestUtil.java | 23 ++
.../hadoop/hdfs/server/namenode/TestFSImage.java | 154 ++++++++++++-
.../server/namenode/TestFSImageWithSnapshot.java | 3 +-
9 files changed, 719 insertions(+), 31 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9bd1246..95806de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
+ public static final String DFS_IMAGE_PARALLEL_LOAD_KEY =
+ "dfs.image.parallel.load";
+ public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true;
+
+ public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY =
+ "dfs.image.parallel.target.sections";
+ public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12;
+
+ public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY =
+ "dfs.image.parallel.inode.threshold";
+ public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000;
+
+ public static final String DFS_IMAGE_PARALLEL_THREADS_KEY =
+ "dfs.image.parallel.threads";
+ public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4;
+
// Edit Log segment transfer timeout
public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
"dfs.edit.log.transfer.timeout";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index cfba091..cea18b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -985,7 +985,8 @@ public class FSImage implements Closeable {
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
- FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
+ FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
+ conf);
FSImageCompression compression = FSImageCompression.createCompression(conf);
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 6825a5c..d84e8c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -25,6 +25,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +95,8 @@ public final class FSImageFormatPBINode {
private static final Logger LOG =
LoggerFactory.getLogger(FSImageFormatPBINode.class);
+ private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000;
+
// the loader must decode all fields referencing serial number based fields
// via to<Item> methods with the string table.
public final static class Loader {
@@ -197,16 +204,66 @@ public final class FSImageFormatPBINode {
private final FSDirectory dir;
private final FSNamesystem fsn;
private final FSImageFormatProtobuf.Loader parent;
+ private ReentrantLock cacheNameMapLock;
+ private ReentrantLock blockMapLock;
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
this.fsn = fsn;
this.dir = fsn.dir;
this.parent = parent;
+ cacheNameMapLock = new ReentrantLock(true);
+ blockMapLock = new ReentrantLock(true);
+ }
+
+ void loadINodeDirectorySectionInParallel(ExecutorService service,
+ ArrayList<FileSummary.Section> sections, String compressionCodec)
+ throws IOException {
+ LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
+ "sections", sections.size());
+ CountDownLatch latch = new CountDownLatch(sections.size());
+ final CopyOnWriteArrayList<IOException> exceptions =
+ new CopyOnWriteArrayList<>();
+ for (FileSummary.Section s : sections) {
+ service.submit(() -> {
+ InputStream ins = null;
+ try {
+ ins = parent.getInputStreamForSection(s,
+ compressionCodec);
+ loadINodeDirectorySection(ins);
+ } catch (Exception e) {
+ LOG.error("An exception occurred loading INodeDirectories in " +
+ "parallel", e);
+ exceptions.add(new IOException(e));
+ } finally {
+ latch.countDown();
+ try {
+ if (ins != null) {
+ ins.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close the input stream, ignoring", ioe);
+ }
+ }
+ });
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for countdown latch", e);
+ throw new IOException(e);
+ }
+ if (exceptions.size() != 0) {
+ LOG.error("{} exceptions occurred loading INodeDirectories",
+ exceptions.size());
+ throw exceptions.get(0);
+ }
+ LOG.info("Completed loading all INodeDirectory sub-sections");
}
void loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
+ ArrayList<INode> inodeList = new ArrayList<>();
while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
@@ -217,33 +274,159 @@ public final class FSImageFormatPBINode {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
- addToParent(p, child);
+ if (addToParent(p, child)) {
+ if (child.isFile()) {
+ inodeList.add(child);
+ }
+ if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+ addToCacheAndBlockMap(inodeList);
+ inodeList.clear();
+ }
+ } else {
+ LOG.warn("Failed to add the inode {} to the directory {}",
+ child.getId(), p.getId());
+ }
}
+
for (int refId : e.getRefChildrenList()) {
INodeReference ref = refList.get(refId);
- addToParent(p, ref);
+ if (addToParent(p, ref)) {
+ if (ref.isFile()) {
+ inodeList.add(ref);
+ }
+ if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
+ addToCacheAndBlockMap(inodeList);
+ inodeList.clear();
+ }
+ } else {
+ LOG.warn("Failed to add the inode reference {} to the directory {}",
+ ref.getId(), p.getId());
+ }
+ }
+ }
+ addToCacheAndBlockMap(inodeList);
+ }
+
+ private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
+ try {
+ cacheNameMapLock.lock();
+ for (INode i : inodeList) {
+ dir.cacheName(i);
+ }
+ } finally {
+ cacheNameMapLock.unlock();
+ }
+
+ try {
+ blockMapLock.lock();
+ for (INode i : inodeList) {
+ updateBlocksMap(i.asFile(), fsn.getBlockManager());
}
+ } finally {
+ blockMapLock.unlock();
}
}
void loadINodeSection(InputStream in, StartupProgress prog,
Step currentStep) throws IOException {
- INodeSection s = INodeSection.parseDelimitedFrom(in);
- fsn.dir.resetLastInodeId(s.getLastInodeId());
- long numInodes = s.getNumInodes();
- LOG.info("Loading " + numInodes + " INodes.");
- prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+ loadINodeSectionHeader(in, prog, currentStep);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
- for (int i = 0; i < numInodes; ++i) {
+ int totalLoaded = loadINodesInSection(in, counter);
+ LOG.info("Successfully loaded {} inodes", totalLoaded);
+ }
+
+ private int loadINodesInSection(InputStream in, Counter counter)
+ throws IOException {
+ // As the input stream is a LimitInputStream, the reading will stop when
+ // EOF is encountered at the end of the stream.
+ int cntr = 0;
+ while (true) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+ if (p == null) {
+ break;
+ }
if (p.getId() == INodeId.ROOT_INODE_ID) {
- loadRootINode(p);
+ synchronized(this) {
+ loadRootINode(p);
+ }
} else {
INode n = loadINode(p);
- dir.addToInodeMap(n);
+ synchronized(this) {
+ dir.addToInodeMap(n);
+ }
+ }
+ cntr++;
+ if (counter != null) {
+ counter.increment();
+ }
+ }
+ return cntr;
+ }
+
+
+ private long loadINodeSectionHeader(InputStream in, StartupProgress prog,
+ Step currentStep) throws IOException {
+ INodeSection s = INodeSection.parseDelimitedFrom(in);
+ fsn.dir.resetLastInodeId(s.getLastInodeId());
+ long numInodes = s.getNumInodes();
+ LOG.info("Loading " + numInodes + " INodes.");
+ prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+ return numInodes;
+ }
+
+ void loadINodeSectionInParallel(ExecutorService service,
+ ArrayList<FileSummary.Section> sections,
+ String compressionCodec, StartupProgress prog,
+ Step currentStep) throws IOException {
+ LOG.info("Loading the INode section in parallel with {} sub-sections",
+ sections.size());
+ long expectedInodes = 0;
+ CountDownLatch latch = new CountDownLatch(sections.size());
+ AtomicInteger totalLoaded = new AtomicInteger(0);
+ final CopyOnWriteArrayList<IOException> exceptions =
+ new CopyOnWriteArrayList<>();
+
+ for (int i=0; i < sections.size(); i++) {
+ FileSummary.Section s = sections.get(i);
+ InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
+ if (i == 0) {
+ // The first inode section has a header which must be processed first
+ expectedInodes = loadINodeSectionHeader(ins, prog, currentStep);
}
- counter.increment();
+ service.submit(() -> {
+ try {
+ totalLoaded.addAndGet(loadINodesInSection(ins, null));
+ prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
+ totalLoaded.get());
+ } catch (Exception e) {
+ LOG.error("An exception occurred loading INodes in parallel", e);
+ exceptions.add(new IOException(e));
+ } finally {
+ latch.countDown();
+ try {
+ ins.close();
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close the input stream, ignoring", ioe);
+ }
+ }
+ });
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted waiting for countdown latch");
}
+ if (exceptions.size() != 0) {
+ LOG.error("{} exceptions occurred loading INodes", exceptions.size());
+ throw exceptions.get(0);
+ }
+ if (totalLoaded.get() != expectedInodes) {
+ throw new IOException("Expected to load "+expectedInodes+" in " +
+ "parallel, but loaded "+totalLoaded.get()+". The image may " +
+ "be corrupt.");
+ }
+ LOG.info("Completed loading all INode sections. Loaded {} inodes.",
+ totalLoaded.get());
}
/**
@@ -261,22 +444,18 @@ public final class FSImageFormatPBINode {
}
}
- private void addToParent(INodeDirectory parent, INode child) {
- if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
+ private boolean addToParent(INodeDirectory parentDir, INode child) {
+ if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) {
throw new HadoopIllegalArgumentException("File name \""
+ child.getLocalName() + "\" is reserved. Please "
+ " change the name of the existing file or directory to another "
+ "name before upgrading to this release.");
}
// NOTE: This does not update space counts for parents
- if (!parent.addChildAtLoading(child)) {
- return;
- }
- dir.cacheName(child);
-
- if (child.isFile()) {
- updateBlocksMap(child.asFile(), fsn.getBlockManager());
+ if (!parentDir.addChildAtLoading(child)) {
+ return false;
}
+ return true;
}
private INode loadINode(INodeSection.INode n) {
@@ -527,6 +706,7 @@ public final class FSImageFormatPBINode {
final ArrayList<INodeReference> refList = parent.getSaverContext()
.getRefList();
int i = 0;
+ int outputInodes = 0;
while (iter.hasNext()) {
INodeWithAdditionalFields n = iter.next();
if (!n.isDirectory()) {
@@ -558,6 +738,7 @@ public final class FSImageFormatPBINode {
refList.add(inode.asReference());
b.addRefChildren(refList.size() - 1);
}
+ outputInodes++;
}
INodeDirectorySection.DirEntry e = b.build();
e.writeDelimitedTo(out);
@@ -567,9 +748,15 @@ public final class FSImageFormatPBINode {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled();
}
+ if (outputInodes >= parent.getInodesPerSubSection()) {
+ outputInodes = 0;
+ parent.commitSubSection(summary,
+ FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
+ }
}
- parent.commitSection(summary,
- FSImageFormatProtobuf.SectionName.INODE_DIR);
+ parent.commitSectionAndSubSection(summary,
+ FSImageFormatProtobuf.SectionName.INODE_DIR,
+ FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
}
void serializeINodeSection(OutputStream out) throws IOException {
@@ -589,8 +776,14 @@ public final class FSImageFormatPBINode {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled();
}
+ if (i % parent.getInodesPerSubSection() == 0) {
+ parent.commitSubSection(summary,
+ FSImageFormatProtobuf.SectionName.INODE_SUB);
+ }
}
- parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
+ parent.commitSectionAndSubSection(summary,
+ FSImageFormatProtobuf.SectionName.INODE,
+ FSImageFormatProtobuf.SectionName.INODE_SUB);
}
void serializeFilesUCSection(OutputStream out) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
index b887a14..3144d4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -40,7 +40,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -150,6 +154,8 @@ public final class FSImageFormatProtobuf {
*/
private final boolean requireSameLayoutVersion;
+ private File filename;
+
Loader(Configuration conf, FSNamesystem fsn,
boolean requireSameLayoutVersion) {
this.conf = conf;
@@ -229,6 +235,7 @@ public final class FSImageFormatProtobuf {
}
void load(File file) throws IOException {
+ filename = file;
long start = Time.monotonicNow();
DigestThread dt = new DigestThread(file);
dt.start();
@@ -250,6 +257,96 @@ public final class FSImageFormatProtobuf {
}
}
+ /**
+ * Given a FSImage FileSummary.section, return a LimitInput stream set to
+ * the starting position of the section and limited to the section length.
+ * @param section The FileSummary.Section containing the offset and length
+ * @param compressionCodec The compression codec in use, if any
+ * @return An InputStream for the given section
+ * @throws IOException
+ */
+ public InputStream getInputStreamForSection(FileSummary.Section section,
+ String compressionCodec)
+ throws IOException {
+ FileInputStream fin = new FileInputStream(filename);
+ FileChannel channel = fin.getChannel();
+ channel.position(section.getOffset());
+ InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+ section.getLength()));
+
+ in = FSImageUtil.wrapInputStreamForCompression(conf,
+ compressionCodec, in);
+ return in;
+ }
+
+ /**
+ * Takes an ArrayList of Section's and removes all Section's whose
+ * name ends in _SUB, indicating they are sub-sections. The original
+ * array list is modified and a new list of the removed Section's is
+ * returned.
+ * @param sections Array List containing all Sections and Sub Sections
+ * in the image.
+ * @return ArrayList of the sections removed, or an empty list if none are
+ * removed.
+ */
+ private ArrayList<FileSummary.Section> getAndRemoveSubSections(
+ ArrayList<FileSummary.Section> sections) {
+ ArrayList<FileSummary.Section> subSections = new ArrayList<>();
+ Iterator<FileSummary.Section> iter = sections.iterator();
+ while (iter.hasNext()) {
+ FileSummary.Section s = iter.next();
+ String name = s.getName();
+ if (name.matches(".*_SUB$")) {
+ subSections.add(s);
+ iter.remove();
+ }
+ }
+ return subSections;
+ }
+
+ /**
+ * Given an ArrayList of Section's, return all Section's with the given
+ * name, or an empty list if none are found.
+ * @param sections ArrayList of the Section's to search though
+ * @param name The name of the Sections to search for
+ * @return ArrayList of the sections matching the given name
+ */
+ private ArrayList<FileSummary.Section> getSubSectionsOfName(
+ ArrayList<FileSummary.Section> sections, SectionName name) {
+ ArrayList<FileSummary.Section> subSec = new ArrayList<>();
+ for (FileSummary.Section s : sections) {
+ String n = s.getName();
+ SectionName sectionName = SectionName.fromString(n);
+ if (sectionName == name) {
+ subSec.add(s);
+ }
+ }
+ return subSec;
+ }
+
+ /**
+ * Checks the number of threads configured for parallel loading and
+ * return an ExecutorService with configured number of threads. If the
+ * thread count is set to less than 1, it will be reset to the default
+ * value
+ * @return ExecutorServie with the correct number of threads
+ */
+ private ExecutorService getParallelExecutorService() {
+ int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
+ if (threads < 1) {
+ LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " +
+ "default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
+ threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
+ threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT;
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ threads);
+ LOG.info("The fsimage will be loaded in parallel using {} threads",
+ threads);
+ return executorService;
+ }
+
private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
throws IOException {
if (!FSImageUtil.checkFileFormat(raFile)) {
@@ -294,6 +391,14 @@ public final class FSImageFormatProtobuf {
* a particular step to be started for once.
*/
Step currentStep = null;
+ boolean loadInParallel = enableParallelSaveAndLoad(conf);
+
+ ExecutorService executorService = null;
+ ArrayList<FileSummary.Section> subSections =
+ getAndRemoveSubSections(sections);
+ if (loadInParallel) {
+ executorService = getParallelExecutorService();
+ }
for (FileSummary.Section s : sections) {
channel.position(s.getOffset());
@@ -308,6 +413,8 @@ public final class FSImageFormatProtobuf {
if (sectionName == null) {
throw new IOException("Unrecognized section " + n);
}
+
+ ArrayList<FileSummary.Section> stageSubSections;
switch (sectionName) {
case NS_INFO:
loadNameSystemSection(in);
@@ -318,14 +425,28 @@ public final class FSImageFormatProtobuf {
case INODE: {
currentStep = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
- inodeLoader.loadINodeSection(in, prog, currentStep);
+ stageSubSections = getSubSectionsOfName(
+ subSections, SectionName.INODE_SUB);
+ if (loadInParallel && (stageSubSections.size() > 0)) {
+ inodeLoader.loadINodeSectionInParallel(executorService,
+ stageSubSections, summary.getCodec(), prog, currentStep);
+ } else {
+ inodeLoader.loadINodeSection(in, prog, currentStep);
+ }
}
break;
case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in);
break;
case INODE_DIR:
- inodeLoader.loadINodeDirectorySection(in);
+ stageSubSections = getSubSectionsOfName(
+ subSections, SectionName.INODE_DIR_SUB);
+ if (loadInParallel && stageSubSections.size() > 0) {
+ inodeLoader.loadINodeDirectorySectionInParallel(executorService,
+ stageSubSections, summary.getCodec());
+ } else {
+ inodeLoader.loadINodeDirectorySection(in);
+ }
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);
@@ -362,6 +483,9 @@ public final class FSImageFormatProtobuf {
break;
}
}
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
private void loadNameSystemSection(InputStream in) throws IOException {
@@ -450,12 +574,34 @@ public final class FSImageFormatProtobuf {
}
}
+ private static boolean enableParallelSaveAndLoad(Configuration conf) {
+ boolean loadInParallel =
+ conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
+ boolean compressionEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
+ if (loadInParallel) {
+ if (compressionEnabled) {
+ LOG.warn("Parallel Image loading and saving is not supported when {}" +
+ " is set to true. Parallel will be disabled.",
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
+ loadInParallel = false;
+ }
+ }
+ return loadInParallel;
+ }
+
public static final class Saver {
public static final int CHECK_CANCEL_INTERVAL = 4096;
+ private boolean writeSubSections = false;
+ private int inodesPerSubSection = Integer.MAX_VALUE;
private final SaveNamespaceContext context;
private final SaverContext saverContext;
private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
+ private long subSectionOffset = currentOffset;
private MD5Hash savedDigest;
private FileChannel fileChannel;
@@ -463,10 +609,12 @@ public final class FSImageFormatProtobuf {
private OutputStream sectionOutputStream;
private CompressionCodec codec;
private OutputStream underlyingOutputStream;
+ private Configuration conf;
- Saver(SaveNamespaceContext context) {
+ Saver(SaveNamespaceContext context, Configuration conf) {
this.context = context;
this.saverContext = new SaverContext();
+ this.conf = conf;
}
public MD5Hash getSavedDigest() {
@@ -481,6 +629,29 @@ public final class FSImageFormatProtobuf {
return saverContext;
}
+ public int getInodesPerSubSection() {
+ return inodesPerSubSection;
+ }
+
+ public boolean shouldWriteSubSections() {
+ return writeSubSections;
+ }
+
+ /**
+ * Commit the length and offset of a fsimage section to the summary index,
+ * including the sub section, which will be committed before the section is
+ * committed.
+ * @param summary The image summary object
+ * @param name The name of the section to commit
+ * @param subSectionName The name of the sub-section to commit
+ * @throws IOException
+ */
+ public void commitSectionAndSubSection(FileSummary.Builder summary,
+ SectionName name, SectionName subSectionName) throws IOException {
+ commitSubSection(summary, subSectionName);
+ commitSection(summary, name);
+ }
+
public void commitSection(FileSummary.Builder summary, SectionName name)
throws IOException {
long oldOffset = currentOffset;
@@ -495,6 +666,35 @@ public final class FSImageFormatProtobuf {
summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
.setLength(length).setOffset(currentOffset));
currentOffset += length;
+ subSectionOffset = currentOffset;
+ }
+
+ /**
+ * Commit the length and offset of a fsimage sub-section to the summary
+ * index.
+ * @param summary The image summary object
+ * @param name The name of the sub-section to commit
+ * @throws IOException
+ */
+ public void commitSubSection(FileSummary.Builder summary, SectionName name)
+ throws IOException {
+ if (!writeSubSections) {
+ return;
+ }
+
+ LOG.debug("Saving a subsection for {}", name.toString());
+ // The output stream must be flushed before the length is obtained
+ // as the flush can move the length forward.
+ sectionOutputStream.flush();
+ long length = fileChannel.position() - subSectionOffset;
+ if (length == 0) {
+ LOG.warn("The requested section for {} is empty. It will not be " +
+ "output to the image", name.toString());
+ return;
+ }
+ summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
+ .setLength(length).setOffset(subSectionOffset));
+ subSectionOffset += length;
}
private void flushSectionOutputStream() throws IOException {
@@ -509,6 +709,7 @@ public final class FSImageFormatProtobuf {
* @throws IOException on fatal error.
*/
long save(File file, FSImageCompression compression) throws IOException {
+ enableSubSectionsIfRequired();
FileOutputStream fout = new FileOutputStream(file);
fileChannel = fout.getChannel();
try {
@@ -525,6 +726,47 @@ public final class FSImageFormatProtobuf {
}
}
+ private void enableSubSectionsIfRequired() {
+ boolean parallelEnabled = enableParallelSaveAndLoad(conf);
+ int inodeThreshold = conf.getInt(
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+ int targetSections = conf.getInt(
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+
+ if (parallelEnabled) {
+ if (targetSections <= 0) {
+ LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
+ " default of {}",
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+ targetSections,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+ targetSections =
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT;
+ }
+ if (inodeThreshold <= 0) {
+ LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
+ " default of {}",
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+ inodeThreshold,
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+ inodeThreshold =
+ DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT;
+ }
+ int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize();
+ // Only enable parallel sections if there are enough inodes
+ if (inodeCount >= inodeThreshold) {
+ writeSubSections = true;
+ // Calculate the inodes per section rounded up to the nearest int
+ inodesPerSubSection = (inodeCount + targetSections - 1) /
+ targetSections;
+ }
+ } else {
+ writeSubSections = false;
+ }
+ }
+
private static void saveFileSummary(OutputStream out, FileSummary summary)
throws IOException {
summary.writeDelimitedTo(out);
@@ -737,11 +979,15 @@ public final class FSImageFormatProtobuf {
EXTENDED_ACL("EXTENDED_ACL"),
ERASURE_CODING("ERASURE_CODING"),
INODE("INODE"),
+ INODE_SUB("INODE_SUB"),
INODE_REFERENCE("INODE_REFERENCE"),
+ INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"),
SNAPSHOT("SNAPSHOT"),
INODE_DIR("INODE_DIR"),
+ INODE_DIR_SUB("INODE_DIR_SUB"),
FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
+ SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"),
SECRET_MANAGER("SECRET_MANAGER"),
CACHE_MANAGER("CACHE_MANAGER");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 2157554..cd5051d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -529,9 +529,14 @@ public class FSImageFormatPBSnapshot {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled();
}
+ if (i % parent.getInodesPerSubSection() == 0) {
+ parent.commitSubSection(headers,
+ FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
+ }
}
- parent.commitSection(headers,
- FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF);
+ parent.commitSectionAndSubSection(headers,
+ FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF,
+ FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
}
private void serializeFileDiffList(INodeFile file, OutputStream out)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 79811aa..74c4f40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1386,6 +1386,57 @@
</property>
<property>
+ <name>dfs.image.parallel.load</name>
+ <value>true</value>
+ <description>
+ If true, write sub-section entries to the fsimage index so it can
+ be loaded in parallel. Also controls whether parallel loading
+ will be used for an image previously created with sub-sections.
+ If the image contains sub-sections and this is set to false,
+ parallel loading will not be used.
+ Parallel loading is not compatible with image compression,
+ so if dfs.image.compress is set to true this setting will be
+ ignored and no parallel loading will occur.
+ </description>
+</property>
+
+<property>
+ <name>dfs.image.parallel.target.sections</name>
+ <value>12</value>
+ <description>
+ Controls the number of sub-sections that will be written to
+ fsimage for each section. This should be larger than
+ dfs.image.parallel.threads, otherwise all threads will not be
+ used when loading. Ideally, have at least twice the number
+ of target sections as threads, so each thread must load more
+ than one section to avoid one long running section affecting
+ the load time.
+ </description>
+</property>
+
+<property>
+ <name>dfs.image.parallel.inode.threshold</name>
+ <value>1000000</value>
+ <description>
+ If the image contains less inodes than this setting, then
+ do not write sub-sections and hence disable parallel loading.
+ This is because small images load very quickly in serial and
+ parallel loading is not needed.
+ </description>
+</property>
+
+<property>
+ <name>dfs.image.parallel.threads</name>
+ <value>4</value>
+ <description>
+ The number of threads to use when dfs.image.parallel.load is
+ enabled. This setting should be less than
+ dfs.image.parallel.target.sections. The optimal number of
+ threads will depend on the hardware and environment.
+ </description>
+</property>
+
+<property>
<name>dfs.edit.log.transfer.timeout</name>
<value>30000</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
index 985ab35..c82d317 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
@@ -606,4 +606,27 @@ public abstract class FSImageTestUtil {
getStorageDirectory(storageUri);
return NNStorage.readTransactionIdFile(sDir);
}
+
+ /**
+ * Returns the summary section from the latest fsimage stored on the cluster.
+ * This is effectively the image index which contains the offset of each
+ * section and subsection.
+ * @param cluster The cluster to load the image from
+ * @return The FileSummary section of the fsimage
+ * @throws IOException
+ */
+ public static FsImageProto.FileSummary getLatestImageSummary(
+ MiniDFSCluster cluster) throws IOException {
+ RandomAccessFile raFile = null;
+ try {
+ File image = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
+ .getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
+ raFile = new RandomAccessFile(image, "r");
+ return FSImageUtil.loadSummary(raFile);
+ } finally {
+ if (raFile != null) {
+ raFile.close();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 0beb758..793a749 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -32,8 +32,10 @@ import java.io.DataInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.EnumSet;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -72,6 +74,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary.Section;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
@@ -1000,4 +1004,152 @@ public class TestFSImage {
}
throw new AssertionError("Policy is not found!");
}
-}
+
+ private ArrayList<Section> getSubSectionsOfName(ArrayList<Section> sections,
+ FSImageFormatProtobuf.SectionName name) {
+ ArrayList<Section> subSec = new ArrayList<>();
+ for (Section s : sections) {
+ if (s.getName().equals(name.toString())) {
+ subSec.add(s);
+ }
+ }
+ return subSec;
+ }
+
+ private MiniDFSCluster createAndLoadParallelFSImage(Configuration conf)
+ throws IOException {
+ conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
+ conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
+ conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
+ conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create 10 directories, each containing 5 files
+ String baseDir = "/abc/def";
+ for (int i=0; i<10; i++) {
+ Path dir = new Path(baseDir+"/"+i);
+ for (int j=0; j<5; j++) {
+ Path f = new Path(dir, Integer.toString(j));
+ FSDataOutputStream os = fs.create(f);
+ os.write(1);
+ os.close();
+ }
+ }
+
+ // checkpoint
+ fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ fs.saveNamespace();
+ fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+ cluster.restartNameNode();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+
+ // Ensure all the files created above exist, proving they were loaded
+ // correctly
+ for (int i=0; i<10; i++) {
+ Path dir = new Path(baseDir+"/"+i);
+ assertTrue(fs.getFileStatus(dir).isDirectory());
+ for (int j=0; j<5; j++) {
+ Path f = new Path(dir, Integer.toString(j));
+ assertTrue(fs.exists(f));
+ }
+ }
+ return cluster;
+ }
+
+ @Test
+ public void testParallelSaveAndLoad() throws IOException {
+ Configuration conf = new Configuration();
+
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = createAndLoadParallelFSImage(conf);
+
+ // Obtain the image summary section to check the sub-sections
+ // are being correctly created when the image is saved.
+ FsImageProto.FileSummary summary = FSImageTestUtil.
+ getLatestImageSummary(cluster);
+ ArrayList<Section> sections = Lists.newArrayList(
+ summary.getSectionsList());
+
+ ArrayList<Section> inodeSubSections =
+ getSubSectionsOfName(sections, SectionName.INODE_SUB);
+ ArrayList<Section> dirSubSections =
+ getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
+ Section inodeSection =
+ getSubSectionsOfName(sections, SectionName.INODE).get(0);
+ Section dirSection = getSubSectionsOfName(sections,
+ SectionName.INODE_DIR).get(0);
+
+ // Expect 4 sub-sections for inodes and directories as target Sections
+ // is 4
+ assertEquals(4, inodeSubSections.size());
+ assertEquals(4, dirSubSections.size());
+
+ // Expect the sub-section offset and lengths do not overlap and cover a
+ // continuous range of the file. They should also line up with the parent
+ ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection);
+ ensureSubSectionsAlignWithParent(dirSubSections, dirSection);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testNoParallelSectionsWithCompressionEnabled()
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+ conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+ "org.apache.hadoop.io.compress.GzipCodec");
+
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = createAndLoadParallelFSImage(conf);
+
+ // Obtain the image summary section to check the sub-sections
+ // are being correctly created when the image is saved.
+ FsImageProto.FileSummary summary = FSImageTestUtil.
+ getLatestImageSummary(cluster);
+ ArrayList<Section> sections = Lists.newArrayList(
+ summary.getSectionsList());
+
+ ArrayList<Section> inodeSubSections =
+ getSubSectionsOfName(sections, SectionName.INODE_SUB);
+ ArrayList<Section> dirSubSections =
+ getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
+
+ // As compression is enabled, there should be no sub-sections in the
+ // image header
+ assertEquals(0, inodeSubSections.size());
+ assertEquals(0, dirSubSections.size());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void ensureSubSectionsAlignWithParent(ArrayList<Section> subSec,
+ Section parent) {
+ // For each sub-section, check its offset + length == the next section
+ // offset
+ for (int i=0; i<subSec.size()-1; i++) {
+ Section s = subSec.get(i);
+ long endOffset = s.getOffset() + s.getLength();
+ assertEquals(subSec.get(i+1).getOffset(), endOffset);
+ }
+ // The last sub-section should align with the parent section
+ Section lastSubSection = subSec.get(subSec.size()-1);
+ assertEquals(parent.getLength()+parent.getOffset(),
+ lastSubSection.getLength() + lastSubSection.getOffset());
+ // The first sub-section and parent section should have the same offset
+ assertEquals(parent.getOffset(), subSec.get(0).getOffset());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
index b8b4ffc..0769a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
@@ -143,7 +143,8 @@ public class TestFSImageWithSnapshot {
private File saveFSImageToTempFile() throws IOException {
SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid,
new Canceler());
- FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
+ FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
+ conf);
FSImageCompression compression = FSImageCompression.createCompression(conf);
File imageFile = getImageFile(testDir, txid);
fsn.readLock();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org