You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2022/03/22 14:29:55 UTC

[hadoop] branch trunk updated: HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8897549  HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
8897549 is described below

commit 88975496d8a076b8923999e9e9ecef13e3721e3d
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Tue Mar 22 22:28:09 2022 +0800

    HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
    
    Signed-off-by: He Xiaoqiao <he...@apache.org>
---
 .../offlineImageViewer/OfflineImageViewerPB.java   |  10 +-
 .../PBImageCorruptionDetector.java                 |   2 +-
 .../PBImageDelimitedTextWriter.java                |   8 +-
 .../offlineImageViewer/PBImageTextWriter.java      | 267 ++++++++++++++++++---
 .../offlineImageViewer/TestOfflineImageViewer.java |  59 ++++-
 .../TestOfflineImageViewerForAcl.java              |   2 +-
 6 files changed, 306 insertions(+), 42 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
index dbcb452..05e687a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
@@ -107,6 +107,7 @@ public class OfflineImageViewerPB {
       + "                       Delimited outputs. If not set, the processor\n"
       + "                       constructs the namespace in memory \n"
       + "                       before outputting text.\n"
+      + "-m,--multiThread <arg> Use multiThread to process sub-sections.\n"
       + "-h,--help              Display usage information and exit\n";
 
   /**
@@ -132,6 +133,7 @@ public class OfflineImageViewerPB {
     options.addOption("delimiter", true, "");
     options.addOption("sp", false, "");
     options.addOption("t", "temp", true, "");
+    options.addOption("m", "multiThread", true, "");
 
     return options;
   }
@@ -185,6 +187,7 @@ public class OfflineImageViewerPB {
     String delimiter = cmd.getOptionValue("delimiter",
         PBImageTextWriter.DEFAULT_DELIMITER);
     String tempPath = cmd.getOptionValue("t", "");
+    int threads = Integer.parseInt(cmd.getOptionValue("m", "1"));
 
     Configuration conf = new Configuration();
     PrintStream out = null;
@@ -227,15 +230,14 @@ public class OfflineImageViewerPB {
         boolean printStoragePolicy = cmd.hasOption("sp");
         try (PBImageDelimitedTextWriter writer =
             new PBImageDelimitedTextWriter(out, delimiter,
-                tempPath, printStoragePolicy);
-            RandomAccessFile r = new RandomAccessFile(inputFile, "r")) {
-          writer.visit(r);
+                tempPath, printStoragePolicy, threads, outputFile)) {
+          writer.visit(inputFile);
         }
         break;
       case "DETECTCORRUPTION":
         try (PBImageCorruptionDetector detector =
             new PBImageCorruptionDetector(out, delimiter, tempPath)) {
-          detector.visit(new RandomAccessFile(inputFile, "r"));
+          detector.visit(inputFile);
         }
         break;
       default:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
index 28c4507..1759386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
@@ -337,7 +337,7 @@ public class PBImageCorruptionDetector extends PBImageTextWriter {
         if (parentId != -1) {
           entryBuilder.setParentId(parentId);
         }
-        printIfNotEmpty(entryBuilder.build());
+        printIfNotEmpty(serialOutStream(), entryBuilder.build());
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
index 45d42f0..3e080ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
@@ -146,7 +146,13 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
   PBImageDelimitedTextWriter(PrintStream out, String delimiter,
                              String tempPath, boolean printStoragePolicy)
       throws IOException {
-    super(out, delimiter, tempPath);
+    this(out, delimiter, tempPath, printStoragePolicy, 1, "-");
+  }
+
+  PBImageDelimitedTextWriter(PrintStream out, String delimiter,
+      String tempPath, boolean printStoragePolicy, int threads,
+      String parallelOut) throws IOException {
+    super(out, delimiter, tempPath, threads, parallelOut);
     this.printStoragePolicy = printStoragePolicy;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
index 08fe7fb..2dab44a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
@@ -21,17 +21,25 @@ import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -455,20 +463,22 @@ abstract class PBImageTextWriter implements Closeable {
         return "/";
       }
       long parent = getFromDirChildMap(inode);
-      if (!dirPathCache.containsKey(parent)) {
-        byte[] bytes = dirMap.get(toBytes(parent));
-        if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
-          // The parent is an INodeReference, which is generated from snapshot.
-          // For delimited oiv tool, no need to print out metadata in snapshots.
-          throw PBImageTextWriter.createIgnoredSnapshotException(inode);
+      byte[] bytes = dirMap.get(toBytes(parent));
+      synchronized (this) {
+        if (!dirPathCache.containsKey(parent)) {
+          if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
+            // The parent is an INodeReference, which is generated from snapshot.
+            // For delimited oiv tool, no need to print out metadata in snapshots.
+            throw PBImageTextWriter.createIgnoredSnapshotException(inode);
+          }
+          String parentName = toString(bytes);
+          String parentPath =
+              new Path(getParentPath(parent),
+                  parentName.isEmpty() ? "/" : parentName).toString();
+          dirPathCache.put(parent, parentPath);
         }
-        String parentName = toString(bytes);
-        String parentPath =
-            new Path(getParentPath(parent),
-                parentName.isEmpty() ? "/" : parentName).toString();
-        dirPathCache.put(parent, parentPath);
+        return dirPathCache.get(parent);
       }
-      return dirPathCache.get(parent);
     }
 
     @Override
@@ -493,9 +503,12 @@ abstract class PBImageTextWriter implements Closeable {
   }
 
   private SerialNumberManager.StringTable stringTable;
-  private PrintStream out;
+  private final PrintStream out;
   private MetadataMap metadataMap = null;
   private String delimiter;
+  private File filename;
+  private int numThreads;
+  private String parallelOutputFile;
 
   /**
    * Construct a PB FsImage writer to generate text file.
@@ -503,8 +516,8 @@ abstract class PBImageTextWriter implements Closeable {
    * @param tempPath the path to store metadata. If it is empty, store metadata
    *                 in memory instead.
    */
-  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
-      throws IOException {
+  PBImageTextWriter(PrintStream out, String delimiter, String tempPath,
+      int numThreads, String parallelOutputFile) throws IOException {
     this.out = out;
     this.delimiter = delimiter;
     if (tempPath.isEmpty()) {
@@ -512,6 +525,17 @@ abstract class PBImageTextWriter implements Closeable {
     } else {
       metadataMap = new LevelDBMetadataMap(tempPath);
     }
+    this.numThreads = numThreads;
+    this.parallelOutputFile = parallelOutputFile;
+  }
+
+  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
+      throws IOException {
+    this(out, delimiter, tempPath, 1, "-");
+  }
+
+  protected PrintStream serialOutStream() {
+    return out;
   }
 
   @Override
@@ -562,7 +586,9 @@ abstract class PBImageTextWriter implements Closeable {
    */
   abstract protected void afterOutput() throws IOException;
 
-  public void visit(RandomAccessFile file) throws IOException {
+  public void visit(String filePath) throws IOException {
+    filename = new File(filePath);
+    RandomAccessFile file = new RandomAccessFile(filePath, "r");
     Configuration conf = new Configuration();
     if (!FSImageUtil.checkFileFormat(file)) {
       throw new IOException("Unrecognized FSImage");
@@ -642,21 +668,122 @@ abstract class PBImageTextWriter implements Closeable {
   private void output(Configuration conf, FileSummary summary,
       FileInputStream fin, ArrayList<FileSummary.Section> sections)
       throws IOException {
+    ArrayList<FileSummary.Section> allINodeSubSections =
+        getINodeSubSections(sections);
+    if (numThreads > 1 && !parallelOutputFile.equals("-") &&
+        allINodeSubSections.size() > 1) {
+      outputInParallel(conf, summary, allINodeSubSections);
+    } else {
+      LOG.info("Serial output due to threads num: {}, parallel output file: {}, " +
+          "subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size());
+      outputInSerial(conf, summary, fin, sections);
+    }
+  }
+
+  private void outputInSerial(Configuration conf, FileSummary summary,
+      FileInputStream fin, ArrayList<FileSummary.Section> sections)
+      throws IOException {
     InputStream is;
     long startTime = Time.monotonicNow();
-    out.println(getHeader());
+    serialOutStream().println(getHeader());
     for (FileSummary.Section section : sections) {
       if (SectionName.fromString(section.getName()) == SectionName.INODE) {
         fin.getChannel().position(section.getOffset());
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, serialOutStream());
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
-    LOG.debug("Time to output inodes: {}ms", timeTaken);
+    LOG.debug("Time to output inodes: {} ms", timeTaken);
+  }
+
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * output parsed results of each section to tmp file in order.
+   * STEP2: Merge tmp files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections using {} threads",
+        subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions = new CopyOnWriteArrayList<>();
+    CountDownLatch latch = new CountDownLatch(subSections.size());
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+    String[] paths = new String[subSections.size()];
+
+    for (int i = 0; i < subSections.size(); i++) {
+      paths[i] = parallelOutputFile + ".tmp." + i;
+      int index = i;
+      executorService.submit(() -> {
+        LOG.info("Output iNodes of section-{}", index);
+        InputStream is = null;
+        try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) {
+          long startTime = Time.monotonicNow();
+          is = getInputStreamForSection(subSections.get(index), codec, conf);
+          if (index == 0) {
+            // The first iNode section has a header which must be processed first
+            INodeSection s = INodeSection.parseDelimitedFrom(is);
+            expectedINodes.set(s.getNumInodes());
+          }
+          totalParsed.addAndGet(outputINodes(is, outStream));
+          long timeTaken = Time.monotonicNow() - startTime;
+          LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken);
+        } catch (Exception e) {
+          exceptions.add(new IOException(e));
+        } finally {
+          latch.countDown();
+          try {
+            if (is != null) {
+              is.close();
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Failed to close the input stream, ignoring", ioe);
+          }
+        }
+      });
+    }
+
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for countdown latch", e);
+      throw new IOException(e);
+    }
+
+    executorService.shutdown();
+    if (exceptions.size() != 0) {
+      LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.",
+          exceptions.size());
+      throw exceptions.get(0);
+    }
+    if (totalParsed.get() != expectedINodes.get()) {
+      throw new IOException("Expected to parse " + expectedINodes + " in parallel, " +
+          "but parsed " + totalParsed.get() + ". The image may be corrupt.");
+    }
+    LOG.info("Completed outputting all INode sub-sections to {} tmp files.",
+        subSections.size());
+
+    try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) {
+      ps.println(getHeader());
+    }
+
+    // merge tmp files
+    long startTime = Time.monotonicNow();
+    mergeFiles(paths, parallelOutputFile);
+    long timeTaken = Time.monotonicNow() - startTime;
+    LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken);
   }
 
   protected PermissionStatus getPermission(long perm) {
@@ -763,22 +890,27 @@ abstract class PBImageTextWriter implements Closeable {
     LOG.info("Scanned {} INode directories to build namespace.", count);
   }
 
-  void printIfNotEmpty(String line) {
+  void printIfNotEmpty(PrintStream outStream, String line) {
     if (!line.isEmpty()) {
-      out.println(line);
+      outStream.println(line);
     }
   }
 
-  private void outputINodes(InputStream in) throws IOException {
-    INodeSection s = INodeSection.parseDelimitedFrom(in);
-    LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+  private int outputINodes(InputStream in, PrintStream outStream)
+      throws IOException {
     long ignored = 0;
     long ignoredSnapshots = 0;
-    for (int i = 0; i < s.getNumInodes(); ++i) {
+    // As the input stream is a LimitInputStream, the reading will stop when
+    // EOF is encountered at the end of the stream.
+    int count = 0;
+    while (true) {
       INode p = INode.parseDelimitedFrom(in);
+      if (p == null) {
+        break;
+      }
       try {
         String parentPath = metadataMap.getParentPath(p.getId());
-        printIfNotEmpty(getEntry(parentPath, p));
+        printIfNotEmpty(outStream, getEntry(parentPath, p));
       } catch (IOException ioe) {
         ignored++;
         if (!(ioe instanceof IgnoreSnapshotException)) {
@@ -790,16 +922,16 @@ abstract class PBImageTextWriter implements Closeable {
           }
         }
       }
-
-      if (LOG.isDebugEnabled() && i % 100000 == 0) {
-        LOG.debug("Outputted {} INodes.", i);
+      count++;
+      if (LOG.isDebugEnabled() && count % 100000 == 0) {
+        LOG.debug("Outputted {} INodes.", count);
       }
     }
     if (ignored > 0) {
       LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
               + " debug log for details", ignored, ignoredSnapshots);
     }
-    LOG.info("Outputted {} INodes.", s.getNumInodes());
+    return count;
   }
 
   private static IgnoreSnapshotException createIgnoredSnapshotException(
@@ -822,4 +954,79 @@ abstract class PBImageTextWriter implements Closeable {
     }
     return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
   }
+
+  private ArrayList<FileSummary.Section> getINodeSubSections(
+      ArrayList<FileSummary.Section> sections) {
+    ArrayList<FileSummary.Section> subSections = new ArrayList<>();
+    Iterator<FileSummary.Section> iter = sections.iterator();
+    while (iter.hasNext()) {
+      FileSummary.Section s = iter.next();
+      if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) {
+        subSections.add(s);
+      }
+    }
+    return subSections;
+  }
+
+  /**
+   * Given a FSImage FileSummary.section, return a LimitInput stream set to
+   * the starting position of the section and limited to the section length.
+   * @param section The FileSummary.Section containing the offset and length
+   * @param compressionCodec The compression codec in use, if any
+   * @return An InputStream for the given section
+   * @throws IOException
+   */
+  private InputStream getInputStreamForSection(FileSummary.Section section,
+      String compressionCodec, Configuration conf)
+      throws IOException {
+    // channel of RandomAccessFile is not thread safe, use File
+    FileInputStream fin = new FileInputStream(filename);
+    try {
+      FileChannel channel = fin.getChannel();
+      channel.position(section.getOffset());
+      InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+          section.getLength()));
+
+      in = FSImageUtil.wrapInputStreamForCompression(conf,
+          compressionCodec, in);
+      return in;
+    } catch (IOException e) {
+      fin.close();
+      throw e;
+    }
+  }
+
+  /**
+   * @param srcPaths Source files of contents to be merged
+   * @param resultPath Merged file path
+   * @throws IOException
+   */
+  public static void mergeFiles(String[] srcPaths, String resultPath)
+      throws IOException {
+    if (srcPaths == null || srcPaths.length < 1) {
+      LOG.warn("no source files to merge.");
+      return;
+    }
+
+    File[] files = new File[srcPaths.length];
+    for (int i = 0; i < srcPaths.length; i++) {
+      files[i] = new File(srcPaths[i]);
+    }
+
+    File resultFile = new File(resultPath);
+    try (FileChannel resultChannel =
+             new FileOutputStream(resultFile, true).getChannel()) {
+      for (File file : files) {
+        try (FileChannel src = new FileInputStream(file).getChannel()) {
+          resultChannel.transferFrom(src, resultChannel.size(), src.size());
+        }
+      }
+    }
+
+    for (File file : files) {
+      if (!file.delete() && file.exists()) {
+        LOG.warn("delete tmp file: {} returned false", file);
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
index 53031bc..9878469 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
@@ -83,8 +83,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 import org.apache.hadoop.net.NetUtils;
@@ -122,6 +124,7 @@ import static org.apache.hadoop.fs.permission.AclEntryType.USER;
 import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static org.apache.hadoop.fs.permission.FsAction.EXECUTE;
 import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY;
@@ -186,6 +189,12 @@ public class TestOfflineImageViewer {
       conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
           "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
+      // fsimage with sub-section conf
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
+
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
       DistributedFileSystem hdfs = cluster.getFileSystem();
@@ -792,6 +801,13 @@ public class TestOfflineImageViewer {
   }
 
   @Test
+  public void testParallelPBDelimitedWriter() throws Exception {
+    testParallelPBDelimitedWriter("");  // Test in memory db.
+    testParallelPBDelimitedWriter(new FileSystemTestHelper().getTestRootDir()
+        + "/parallel-delimited.db");
+  }
+
+  @Test
   public void testCorruptionOutputEntryBuilder() throws IOException {
     PBImageCorruptionDetector corrDetector =
         new PBImageCorruptionDetector(null, ",", "");
@@ -882,11 +898,10 @@ public class TestOfflineImageViewer {
     final String DELIMITER = "\t";
     ByteArrayOutputStream output = new ByteArrayOutputStream();
 
-    try (PrintStream o = new PrintStream(output);
-        RandomAccessFile r = new RandomAccessFile(originalFsimage, "r")) {
+    try (PrintStream o = new PrintStream(output)) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, db);
-      v.visit(r);
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     Set<String> fileNames = new HashSet<>();
@@ -920,6 +935,37 @@ public class TestOfflineImageViewer {
     assertEquals(writtenFiles.keySet(), fileNames);
   }
 
+  private void testParallelPBDelimitedWriter(String db) throws Exception{
+    String delimiter = "\t";
+    int numThreads = 4;
+
+    File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut");
+    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
+        "-i", originalFsimage.getAbsolutePath(),
+        "-o", parallelDelimitedOut.getAbsolutePath(),
+        "-delimiter", delimiter,
+        "-t", db,
+        "-m", String.valueOf(numThreads)}) != 0) {
+      throw new IOException("oiv returned failure outputting in parallel.");
+    }
+    MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut);
+
+    File serialDelimitedOut = new File(tempDir, "serialDelimitedOut");
+    if (db != "") {
+      db = db + "/../serial.db";
+    }
+    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
+        "-i", originalFsimage.getAbsolutePath(),
+        "-o", serialDelimitedOut.getAbsolutePath(),
+        "-t", db,
+        "-delimiter", delimiter}) != 0) {
+      throw new IOException("oiv returned failure outputting in serial.");
+    }
+    MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut);
+
+    assertEquals(parallelMd5, serialMd5);
+  }
+
   private void testPBCorruptionDetector(String db)
       throws IOException, InterruptedException {
     final String delimiter = "\t";
@@ -928,7 +974,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, delimiter, db);
-      v.visit(new RandomAccessFile(originalFsimage, "r"));
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     try (
@@ -1024,7 +1070,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, ",", db);
-      v.visit(new RandomAccessFile(corruptedImage, "r"));
+      v.visit(corruptedImage.getAbsolutePath());
     }
     return output.toString();
   }
@@ -1212,6 +1258,9 @@ public class TestOfflineImageViewer {
   public void testFileDistributionCalculatorForException() throws Exception {
     File fsimageFile = null;
     Configuration conf = new Configuration();
+    // Avoid using the same cluster dir to cause the global originalFsimage
+    // file to be cleared.
+    conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath());
     HashMap<String, FileStatus> files = Maps.newHashMap();
 
     // Create a initial fsimage file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
index 4955846..b23ddf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
@@ -239,7 +239,7 @@ public class TestOfflineImageViewerForAcl {
     try (PrintStream o = new PrintStream(output)) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, "");  // run in memory.
-      v.visit(new RandomAccessFile(originalFsimage, "r"));
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     try (

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org