You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2022/03/23 03:35:09 UTC

[hadoop] branch trunk updated (81879eb -> ef8bff0)

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 81879eb  HDFS-16471. Make HDFS ls tool cross platform (#4086)
     new 26ba384  Revert "HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang."
     new ef8bff0  HDFS-15987. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: Revert "HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang."

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 26ba3846cc84fa3f4a4de37920ae5aae72584af1
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Mar 23 11:01:35 2022 +0800

    Revert "HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang."
    
    This reverts commit 88975496d8a076b8923999e9e9ecef13e3721e3d.
---
 .../offlineImageViewer/OfflineImageViewerPB.java   |  10 +-
 .../PBImageCorruptionDetector.java                 |   2 +-
 .../PBImageDelimitedTextWriter.java                |   8 +-
 .../offlineImageViewer/PBImageTextWriter.java      | 267 +++------------------
 .../offlineImageViewer/TestOfflineImageViewer.java |  59 +----
 .../TestOfflineImageViewerForAcl.java              |   2 +-
 6 files changed, 42 insertions(+), 306 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
index 05e687a..dbcb452 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
@@ -107,7 +107,6 @@ public class OfflineImageViewerPB {
       + "                       Delimited outputs. If not set, the processor\n"
       + "                       constructs the namespace in memory \n"
       + "                       before outputting text.\n"
-      + "-m,--multiThread <arg> Use multiThread to process sub-sections.\n"
       + "-h,--help              Display usage information and exit\n";
 
   /**
@@ -133,7 +132,6 @@ public class OfflineImageViewerPB {
     options.addOption("delimiter", true, "");
     options.addOption("sp", false, "");
     options.addOption("t", "temp", true, "");
-    options.addOption("m", "multiThread", true, "");
 
     return options;
   }
@@ -187,7 +185,6 @@ public class OfflineImageViewerPB {
     String delimiter = cmd.getOptionValue("delimiter",
         PBImageTextWriter.DEFAULT_DELIMITER);
     String tempPath = cmd.getOptionValue("t", "");
-    int threads = Integer.parseInt(cmd.getOptionValue("m", "1"));
 
     Configuration conf = new Configuration();
     PrintStream out = null;
@@ -230,14 +227,15 @@ public class OfflineImageViewerPB {
         boolean printStoragePolicy = cmd.hasOption("sp");
         try (PBImageDelimitedTextWriter writer =
             new PBImageDelimitedTextWriter(out, delimiter,
-                tempPath, printStoragePolicy, threads, outputFile)) {
-          writer.visit(inputFile);
+                tempPath, printStoragePolicy);
+            RandomAccessFile r = new RandomAccessFile(inputFile, "r")) {
+          writer.visit(r);
         }
         break;
       case "DETECTCORRUPTION":
         try (PBImageCorruptionDetector detector =
             new PBImageCorruptionDetector(out, delimiter, tempPath)) {
-          detector.visit(inputFile);
+          detector.visit(new RandomAccessFile(inputFile, "r"));
         }
         break;
       default:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
index 1759386..28c4507 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
@@ -337,7 +337,7 @@ public class PBImageCorruptionDetector extends PBImageTextWriter {
         if (parentId != -1) {
           entryBuilder.setParentId(parentId);
         }
-        printIfNotEmpty(serialOutStream(), entryBuilder.build());
+        printIfNotEmpty(entryBuilder.build());
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
index 3e080ec..45d42f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
@@ -146,13 +146,7 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
   PBImageDelimitedTextWriter(PrintStream out, String delimiter,
                              String tempPath, boolean printStoragePolicy)
       throws IOException {
-    this(out, delimiter, tempPath, printStoragePolicy, 1, "-");
-  }
-
-  PBImageDelimitedTextWriter(PrintStream out, String delimiter,
-      String tempPath, boolean printStoragePolicy, int threads,
-      String parallelOut) throws IOException {
-    super(out, delimiter, tempPath, threads, parallelOut);
+    super(out, delimiter, tempPath);
     this.printStoragePolicy = printStoragePolicy;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
index 2dab44a..08fe7fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
@@ -21,25 +21,17 @@ import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -463,22 +455,20 @@ abstract class PBImageTextWriter implements Closeable {
         return "/";
       }
       long parent = getFromDirChildMap(inode);
-      byte[] bytes = dirMap.get(toBytes(parent));
-      synchronized (this) {
-        if (!dirPathCache.containsKey(parent)) {
-          if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
-            // The parent is an INodeReference, which is generated from snapshot.
-            // For delimited oiv tool, no need to print out metadata in snapshots.
-            throw PBImageTextWriter.createIgnoredSnapshotException(inode);
-          }
-          String parentName = toString(bytes);
-          String parentPath =
-              new Path(getParentPath(parent),
-                  parentName.isEmpty() ? "/" : parentName).toString();
-          dirPathCache.put(parent, parentPath);
+      if (!dirPathCache.containsKey(parent)) {
+        byte[] bytes = dirMap.get(toBytes(parent));
+        if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
+          // The parent is an INodeReference, which is generated from snapshot.
+          // For delimited oiv tool, no need to print out metadata in snapshots.
+          throw PBImageTextWriter.createIgnoredSnapshotException(inode);
         }
-        return dirPathCache.get(parent);
+        String parentName = toString(bytes);
+        String parentPath =
+            new Path(getParentPath(parent),
+                parentName.isEmpty() ? "/" : parentName).toString();
+        dirPathCache.put(parent, parentPath);
       }
+      return dirPathCache.get(parent);
     }
 
     @Override
@@ -503,12 +493,9 @@ abstract class PBImageTextWriter implements Closeable {
   }
 
   private SerialNumberManager.StringTable stringTable;
-  private final PrintStream out;
+  private PrintStream out;
   private MetadataMap metadataMap = null;
   private String delimiter;
-  private File filename;
-  private int numThreads;
-  private String parallelOutputFile;
 
   /**
    * Construct a PB FsImage writer to generate text file.
@@ -516,8 +503,8 @@ abstract class PBImageTextWriter implements Closeable {
    * @param tempPath the path to store metadata. If it is empty, store metadata
    *                 in memory instead.
    */
-  PBImageTextWriter(PrintStream out, String delimiter, String tempPath,
-      int numThreads, String parallelOutputFile) throws IOException {
+  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
+      throws IOException {
     this.out = out;
     this.delimiter = delimiter;
     if (tempPath.isEmpty()) {
@@ -525,17 +512,6 @@ abstract class PBImageTextWriter implements Closeable {
     } else {
       metadataMap = new LevelDBMetadataMap(tempPath);
     }
-    this.numThreads = numThreads;
-    this.parallelOutputFile = parallelOutputFile;
-  }
-
-  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
-      throws IOException {
-    this(out, delimiter, tempPath, 1, "-");
-  }
-
-  protected PrintStream serialOutStream() {
-    return out;
   }
 
   @Override
@@ -586,9 +562,7 @@ abstract class PBImageTextWriter implements Closeable {
    */
   abstract protected void afterOutput() throws IOException;
 
-  public void visit(String filePath) throws IOException {
-    filename = new File(filePath);
-    RandomAccessFile file = new RandomAccessFile(filePath, "r");
+  public void visit(RandomAccessFile file) throws IOException {
     Configuration conf = new Configuration();
     if (!FSImageUtil.checkFileFormat(file)) {
       throw new IOException("Unrecognized FSImage");
@@ -668,122 +642,21 @@ abstract class PBImageTextWriter implements Closeable {
   private void output(Configuration conf, FileSummary summary,
       FileInputStream fin, ArrayList<FileSummary.Section> sections)
       throws IOException {
-    ArrayList<FileSummary.Section> allINodeSubSections =
-        getINodeSubSections(sections);
-    if (numThreads > 1 && !parallelOutputFile.equals("-") &&
-        allINodeSubSections.size() > 1) {
-      outputInParallel(conf, summary, allINodeSubSections);
-    } else {
-      LOG.info("Serial output due to threads num: {}, parallel output file: {}, " +
-          "subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size());
-      outputInSerial(conf, summary, fin, sections);
-    }
-  }
-
-  private void outputInSerial(Configuration conf, FileSummary summary,
-      FileInputStream fin, ArrayList<FileSummary.Section> sections)
-      throws IOException {
     InputStream is;
     long startTime = Time.monotonicNow();
-    serialOutStream().println(getHeader());
+    out.println(getHeader());
     for (FileSummary.Section section : sections) {
       if (SectionName.fromString(section.getName()) == SectionName.INODE) {
         fin.getChannel().position(section.getOffset());
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        INodeSection s = INodeSection.parseDelimitedFrom(is);
-        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
-        int count = outputINodes(is, serialOutStream());
-        LOG.info("Outputted {} INodes.", count);
+        outputINodes(is);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
-    LOG.debug("Time to output inodes: {} ms", timeTaken);
-  }
-
-  /**
-   * STEP1: Multi-threaded process sub-sections.
-   * Given n (n>1) threads to process k (k>=n) sections,
-   * output parsed results of each section to tmp file in order.
-   * STEP2: Merge tmp files.
-   */
-  private void outputInParallel(Configuration conf, FileSummary summary,
-      ArrayList<FileSummary.Section> subSections)
-      throws IOException {
-    int nThreads = Integer.min(numThreads, subSections.size());
-    LOG.info("Outputting in parallel with {} sub-sections using {} threads",
-        subSections.size(), nThreads);
-    final CopyOnWriteArrayList<IOException> exceptions = new CopyOnWriteArrayList<>();
-    CountDownLatch latch = new CountDownLatch(subSections.size());
-    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
-    AtomicLong expectedINodes = new AtomicLong(0);
-    AtomicLong totalParsed = new AtomicLong(0);
-    String codec = summary.getCodec();
-    String[] paths = new String[subSections.size()];
-
-    for (int i = 0; i < subSections.size(); i++) {
-      paths[i] = parallelOutputFile + ".tmp." + i;
-      int index = i;
-      executorService.submit(() -> {
-        LOG.info("Output iNodes of section-{}", index);
-        InputStream is = null;
-        try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) {
-          long startTime = Time.monotonicNow();
-          is = getInputStreamForSection(subSections.get(index), codec, conf);
-          if (index == 0) {
-            // The first iNode section has a header which must be processed first
-            INodeSection s = INodeSection.parseDelimitedFrom(is);
-            expectedINodes.set(s.getNumInodes());
-          }
-          totalParsed.addAndGet(outputINodes(is, outStream));
-          long timeTaken = Time.monotonicNow() - startTime;
-          LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken);
-        } catch (Exception e) {
-          exceptions.add(new IOException(e));
-        } finally {
-          latch.countDown();
-          try {
-            if (is != null) {
-              is.close();
-            }
-          } catch (IOException ioe) {
-            LOG.warn("Failed to close the input stream, ignoring", ioe);
-          }
-        }
-      });
-    }
-
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for countdown latch", e);
-      throw new IOException(e);
-    }
-
-    executorService.shutdown();
-    if (exceptions.size() != 0) {
-      LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.",
-          exceptions.size());
-      throw exceptions.get(0);
-    }
-    if (totalParsed.get() != expectedINodes.get()) {
-      throw new IOException("Expected to parse " + expectedINodes + " in parallel, " +
-          "but parsed " + totalParsed.get() + ". The image may be corrupt.");
-    }
-    LOG.info("Completed outputting all INode sub-sections to {} tmp files.",
-        subSections.size());
-
-    try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) {
-      ps.println(getHeader());
-    }
-
-    // merge tmp files
-    long startTime = Time.monotonicNow();
-    mergeFiles(paths, parallelOutputFile);
-    long timeTaken = Time.monotonicNow() - startTime;
-    LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken);
+    LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
   protected PermissionStatus getPermission(long perm) {
@@ -890,27 +763,22 @@ abstract class PBImageTextWriter implements Closeable {
     LOG.info("Scanned {} INode directories to build namespace.", count);
   }
 
-  void printIfNotEmpty(PrintStream outStream, String line) {
+  void printIfNotEmpty(String line) {
     if (!line.isEmpty()) {
-      outStream.println(line);
+      out.println(line);
     }
   }
 
-  private int outputINodes(InputStream in, PrintStream outStream)
-      throws IOException {
+  private void outputINodes(InputStream in) throws IOException {
+    INodeSection s = INodeSection.parseDelimitedFrom(in);
+    LOG.info("Found {} INodes in the INode section", s.getNumInodes());
     long ignored = 0;
     long ignoredSnapshots = 0;
-    // As the input stream is a LimitInputStream, the reading will stop when
-    // EOF is encountered at the end of the stream.
-    int count = 0;
-    while (true) {
+    for (int i = 0; i < s.getNumInodes(); ++i) {
       INode p = INode.parseDelimitedFrom(in);
-      if (p == null) {
-        break;
-      }
       try {
         String parentPath = metadataMap.getParentPath(p.getId());
-        printIfNotEmpty(outStream, getEntry(parentPath, p));
+        printIfNotEmpty(getEntry(parentPath, p));
       } catch (IOException ioe) {
         ignored++;
         if (!(ioe instanceof IgnoreSnapshotException)) {
@@ -922,16 +790,16 @@ abstract class PBImageTextWriter implements Closeable {
           }
         }
       }
-      count++;
-      if (LOG.isDebugEnabled() && count % 100000 == 0) {
-        LOG.debug("Outputted {} INodes.", count);
+
+      if (LOG.isDebugEnabled() && i % 100000 == 0) {
+        LOG.debug("Outputted {} INodes.", i);
       }
     }
     if (ignored > 0) {
       LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
               + " debug log for details", ignored, ignoredSnapshots);
     }
-    return count;
+    LOG.info("Outputted {} INodes.", s.getNumInodes());
   }
 
   private static IgnoreSnapshotException createIgnoredSnapshotException(
@@ -954,79 +822,4 @@ abstract class PBImageTextWriter implements Closeable {
     }
     return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
   }
-
-  private ArrayList<FileSummary.Section> getINodeSubSections(
-      ArrayList<FileSummary.Section> sections) {
-    ArrayList<FileSummary.Section> subSections = new ArrayList<>();
-    Iterator<FileSummary.Section> iter = sections.iterator();
-    while (iter.hasNext()) {
-      FileSummary.Section s = iter.next();
-      if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) {
-        subSections.add(s);
-      }
-    }
-    return subSections;
-  }
-
-  /**
-   * Given a FSImage FileSummary.section, return a LimitInput stream set to
-   * the starting position of the section and limited to the section length.
-   * @param section The FileSummary.Section containing the offset and length
-   * @param compressionCodec The compression codec in use, if any
-   * @return An InputStream for the given section
-   * @throws IOException
-   */
-  private InputStream getInputStreamForSection(FileSummary.Section section,
-      String compressionCodec, Configuration conf)
-      throws IOException {
-    // channel of RandomAccessFile is not thread safe, use File
-    FileInputStream fin = new FileInputStream(filename);
-    try {
-      FileChannel channel = fin.getChannel();
-      channel.position(section.getOffset());
-      InputStream in = new BufferedInputStream(new LimitInputStream(fin,
-          section.getLength()));
-
-      in = FSImageUtil.wrapInputStreamForCompression(conf,
-          compressionCodec, in);
-      return in;
-    } catch (IOException e) {
-      fin.close();
-      throw e;
-    }
-  }
-
-  /**
-   * @param srcPaths Source files of contents to be merged
-   * @param resultPath Merged file path
-   * @throws IOException
-   */
-  public static void mergeFiles(String[] srcPaths, String resultPath)
-      throws IOException {
-    if (srcPaths == null || srcPaths.length < 1) {
-      LOG.warn("no source files to merge.");
-      return;
-    }
-
-    File[] files = new File[srcPaths.length];
-    for (int i = 0; i < srcPaths.length; i++) {
-      files[i] = new File(srcPaths[i]);
-    }
-
-    File resultFile = new File(resultPath);
-    try (FileChannel resultChannel =
-             new FileOutputStream(resultFile, true).getChannel()) {
-      for (File file : files) {
-        try (FileChannel src = new FileInputStream(file).getChannel()) {
-          resultChannel.transferFrom(src, resultChannel.size(), src.size());
-        }
-      }
-    }
-
-    for (File file : files) {
-      if (!file.delete() && file.exists()) {
-        LOG.warn("delete tmp file: {} returned false", file);
-      }
-    }
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
index 9878469..53031bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
@@ -83,10 +83,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 import org.apache.hadoop.net.NetUtils;
@@ -124,7 +122,6 @@ import static org.apache.hadoop.fs.permission.AclEntryType.USER;
 import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static org.apache.hadoop.fs.permission.FsAction.EXECUTE;
 import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
-import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY;
@@ -189,12 +186,6 @@ public class TestOfflineImageViewer {
       conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
           "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
-      // fsimage with sub-section conf
-      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
-      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
-      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
-      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
-
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
       DistributedFileSystem hdfs = cluster.getFileSystem();
@@ -801,13 +792,6 @@ public class TestOfflineImageViewer {
   }
 
   @Test
-  public void testParallelPBDelimitedWriter() throws Exception {
-    testParallelPBDelimitedWriter("");  // Test in memory db.
-    testParallelPBDelimitedWriter(new FileSystemTestHelper().getTestRootDir()
-        + "/parallel-delimited.db");
-  }
-
-  @Test
   public void testCorruptionOutputEntryBuilder() throws IOException {
     PBImageCorruptionDetector corrDetector =
         new PBImageCorruptionDetector(null, ",", "");
@@ -898,10 +882,11 @@ public class TestOfflineImageViewer {
     final String DELIMITER = "\t";
     ByteArrayOutputStream output = new ByteArrayOutputStream();
 
-    try (PrintStream o = new PrintStream(output)) {
+    try (PrintStream o = new PrintStream(output);
+        RandomAccessFile r = new RandomAccessFile(originalFsimage, "r")) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, db);
-      v.visit(originalFsimage.getAbsolutePath());
+      v.visit(r);
     }
 
     Set<String> fileNames = new HashSet<>();
@@ -935,37 +920,6 @@ public class TestOfflineImageViewer {
     assertEquals(writtenFiles.keySet(), fileNames);
   }
 
-  private void testParallelPBDelimitedWriter(String db) throws Exception{
-    String delimiter = "\t";
-    int numThreads = 4;
-
-    File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut");
-    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
-        "-i", originalFsimage.getAbsolutePath(),
-        "-o", parallelDelimitedOut.getAbsolutePath(),
-        "-delimiter", delimiter,
-        "-t", db,
-        "-m", String.valueOf(numThreads)}) != 0) {
-      throw new IOException("oiv returned failure outputting in parallel.");
-    }
-    MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut);
-
-    File serialDelimitedOut = new File(tempDir, "serialDelimitedOut");
-    if (db != "") {
-      db = db + "/../serial.db";
-    }
-    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
-        "-i", originalFsimage.getAbsolutePath(),
-        "-o", serialDelimitedOut.getAbsolutePath(),
-        "-t", db,
-        "-delimiter", delimiter}) != 0) {
-      throw new IOException("oiv returned failure outputting in serial.");
-    }
-    MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut);
-
-    assertEquals(parallelMd5, serialMd5);
-  }
-
   private void testPBCorruptionDetector(String db)
       throws IOException, InterruptedException {
     final String delimiter = "\t";
@@ -974,7 +928,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, delimiter, db);
-      v.visit(originalFsimage.getAbsolutePath());
+      v.visit(new RandomAccessFile(originalFsimage, "r"));
     }
 
     try (
@@ -1070,7 +1024,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, ",", db);
-      v.visit(corruptedImage.getAbsolutePath());
+      v.visit(new RandomAccessFile(corruptedImage, "r"));
     }
     return output.toString();
   }
@@ -1258,9 +1212,6 @@ public class TestOfflineImageViewer {
   public void testFileDistributionCalculatorForException() throws Exception {
     File fsimageFile = null;
     Configuration conf = new Configuration();
-    // Avoid using the same cluster dir to cause the global originalFsimage
-    // file to be cleared.
-    conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath());
     HashMap<String, FileStatus> files = Maps.newHashMap();
 
     // Create a initial fsimage file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
index b23ddf4..4955846 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
@@ -239,7 +239,7 @@ public class TestOfflineImageViewerForAcl {
     try (PrintStream o = new PrintStream(output)) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, "");  // run in memory.
-      v.visit(originalFsimage.getAbsolutePath());
+      v.visit(new RandomAccessFile(originalFsimage, "r"));
     }
 
     try (

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HDFS-15987. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ef8bff0df9f2b1eca99f655e2f74f40d3f098a2e
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Mar 23 11:32:12 2022 +0800

    HDFS-15987. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
    
    Signed-off-by: He Xiaoqiao <he...@apache.org>
---
 .../offlineImageViewer/OfflineImageViewerPB.java   |  10 +-
 .../PBImageCorruptionDetector.java                 |   2 +-
 .../PBImageDelimitedTextWriter.java                |   8 +-
 .../offlineImageViewer/PBImageTextWriter.java      | 267 ++++++++++++++++++---
 .../offlineImageViewer/TestOfflineImageViewer.java |  59 ++++-
 .../TestOfflineImageViewerForAcl.java              |   2 +-
 6 files changed, 306 insertions(+), 42 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
index dbcb452..05e687a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
@@ -107,6 +107,7 @@ public class OfflineImageViewerPB {
       + "                       Delimited outputs. If not set, the processor\n"
       + "                       constructs the namespace in memory \n"
       + "                       before outputting text.\n"
+      + "-m,--multiThread <arg> Use multiThread to process sub-sections.\n"
       + "-h,--help              Display usage information and exit\n";
 
   /**
@@ -132,6 +133,7 @@ public class OfflineImageViewerPB {
     options.addOption("delimiter", true, "");
     options.addOption("sp", false, "");
     options.addOption("t", "temp", true, "");
+    options.addOption("m", "multiThread", true, "");
 
     return options;
   }
@@ -185,6 +187,7 @@ public class OfflineImageViewerPB {
     String delimiter = cmd.getOptionValue("delimiter",
         PBImageTextWriter.DEFAULT_DELIMITER);
     String tempPath = cmd.getOptionValue("t", "");
+    int threads = Integer.parseInt(cmd.getOptionValue("m", "1"));
 
     Configuration conf = new Configuration();
     PrintStream out = null;
@@ -227,15 +230,14 @@ public class OfflineImageViewerPB {
         boolean printStoragePolicy = cmd.hasOption("sp");
         try (PBImageDelimitedTextWriter writer =
             new PBImageDelimitedTextWriter(out, delimiter,
-                tempPath, printStoragePolicy);
-            RandomAccessFile r = new RandomAccessFile(inputFile, "r")) {
-          writer.visit(r);
+                tempPath, printStoragePolicy, threads, outputFile)) {
+          writer.visit(inputFile);
         }
         break;
       case "DETECTCORRUPTION":
         try (PBImageCorruptionDetector detector =
             new PBImageCorruptionDetector(out, delimiter, tempPath)) {
-          detector.visit(new RandomAccessFile(inputFile, "r"));
+          detector.visit(inputFile);
         }
         break;
       default:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
index 28c4507..1759386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java
@@ -337,7 +337,7 @@ public class PBImageCorruptionDetector extends PBImageTextWriter {
         if (parentId != -1) {
           entryBuilder.setParentId(parentId);
         }
-        printIfNotEmpty(entryBuilder.build());
+        printIfNotEmpty(serialOutStream(), entryBuilder.build());
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
index 45d42f0..3e080ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java
@@ -146,7 +146,13 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
   PBImageDelimitedTextWriter(PrintStream out, String delimiter,
                              String tempPath, boolean printStoragePolicy)
       throws IOException {
-    super(out, delimiter, tempPath);
+    this(out, delimiter, tempPath, printStoragePolicy, 1, "-");
+  }
+
+  PBImageDelimitedTextWriter(PrintStream out, String delimiter,
+      String tempPath, boolean printStoragePolicy, int threads,
+      String parallelOut) throws IOException {
+    super(out, delimiter, tempPath, threads, parallelOut);
     this.printStoragePolicy = printStoragePolicy;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
index 08fe7fb..2dab44a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
@@ -21,17 +21,25 @@ import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -455,20 +463,22 @@ abstract class PBImageTextWriter implements Closeable {
         return "/";
       }
       long parent = getFromDirChildMap(inode);
-      if (!dirPathCache.containsKey(parent)) {
-        byte[] bytes = dirMap.get(toBytes(parent));
-        if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
-          // The parent is an INodeReference, which is generated from snapshot.
-          // For delimited oiv tool, no need to print out metadata in snapshots.
-          throw PBImageTextWriter.createIgnoredSnapshotException(inode);
+      byte[] bytes = dirMap.get(toBytes(parent));
+      synchronized (this) {
+        if (!dirPathCache.containsKey(parent)) {
+          if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
+            // The parent is an INodeReference, which is generated from snapshot.
+            // For delimited oiv tool, no need to print out metadata in snapshots.
+            throw PBImageTextWriter.createIgnoredSnapshotException(inode);
+          }
+          String parentName = toString(bytes);
+          String parentPath =
+              new Path(getParentPath(parent),
+                  parentName.isEmpty() ? "/" : parentName).toString();
+          dirPathCache.put(parent, parentPath);
         }
-        String parentName = toString(bytes);
-        String parentPath =
-            new Path(getParentPath(parent),
-                parentName.isEmpty() ? "/" : parentName).toString();
-        dirPathCache.put(parent, parentPath);
+        return dirPathCache.get(parent);
       }
-      return dirPathCache.get(parent);
     }
 
     @Override
@@ -493,9 +503,12 @@ abstract class PBImageTextWriter implements Closeable {
   }
 
   private SerialNumberManager.StringTable stringTable;
-  private PrintStream out;
+  private final PrintStream out;
   private MetadataMap metadataMap = null;
   private String delimiter;
+  private File filename;
+  private int numThreads;
+  private String parallelOutputFile;
 
   /**
    * Construct a PB FsImage writer to generate text file.
@@ -503,8 +516,8 @@ abstract class PBImageTextWriter implements Closeable {
    * @param tempPath the path to store metadata. If it is empty, store metadata
    *                 in memory instead.
    */
-  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
-      throws IOException {
+  PBImageTextWriter(PrintStream out, String delimiter, String tempPath,
+      int numThreads, String parallelOutputFile) throws IOException {
     this.out = out;
     this.delimiter = delimiter;
     if (tempPath.isEmpty()) {
@@ -512,6 +525,17 @@ abstract class PBImageTextWriter implements Closeable {
     } else {
       metadataMap = new LevelDBMetadataMap(tempPath);
     }
+    this.numThreads = numThreads;
+    this.parallelOutputFile = parallelOutputFile;
+  }
+
+  PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
+      throws IOException {
+    this(out, delimiter, tempPath, 1, "-");
+  }
+
+  protected PrintStream serialOutStream() {
+    return out;
   }
 
   @Override
@@ -562,7 +586,9 @@ abstract class PBImageTextWriter implements Closeable {
    */
   abstract protected void afterOutput() throws IOException;
 
-  public void visit(RandomAccessFile file) throws IOException {
+  public void visit(String filePath) throws IOException {
+    filename = new File(filePath);
+    RandomAccessFile file = new RandomAccessFile(filePath, "r");
     Configuration conf = new Configuration();
     if (!FSImageUtil.checkFileFormat(file)) {
       throw new IOException("Unrecognized FSImage");
@@ -642,21 +668,122 @@ abstract class PBImageTextWriter implements Closeable {
   private void output(Configuration conf, FileSummary summary,
       FileInputStream fin, ArrayList<FileSummary.Section> sections)
       throws IOException {
+    ArrayList<FileSummary.Section> allINodeSubSections =
+        getINodeSubSections(sections);
+    if (numThreads > 1 && !parallelOutputFile.equals("-") &&
+        allINodeSubSections.size() > 1) {
+      outputInParallel(conf, summary, allINodeSubSections);
+    } else {
+      LOG.info("Serial output due to threads num: {}, parallel output file: {}, " +
+          "subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size());
+      outputInSerial(conf, summary, fin, sections);
+    }
+  }
+
+  private void outputInSerial(Configuration conf, FileSummary summary,
+      FileInputStream fin, ArrayList<FileSummary.Section> sections)
+      throws IOException {
     InputStream is;
     long startTime = Time.monotonicNow();
-    out.println(getHeader());
+    serialOutStream().println(getHeader());
     for (FileSummary.Section section : sections) {
       if (SectionName.fromString(section.getName()) == SectionName.INODE) {
         fin.getChannel().position(section.getOffset());
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, serialOutStream());
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
-    LOG.debug("Time to output inodes: {}ms", timeTaken);
+    LOG.debug("Time to output inodes: {} ms", timeTaken);
+  }
+
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * output parsed results of each section to tmp file in order.
+   * STEP2: Merge tmp files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections using {} threads",
+        subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions = new CopyOnWriteArrayList<>();
+    CountDownLatch latch = new CountDownLatch(subSections.size());
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+    String[] paths = new String[subSections.size()];
+
+    for (int i = 0; i < subSections.size(); i++) {
+      paths[i] = parallelOutputFile + ".tmp." + i;
+      int index = i;
+      executorService.submit(() -> {
+        LOG.info("Output iNodes of section-{}", index);
+        InputStream is = null;
+        try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) {
+          long startTime = Time.monotonicNow();
+          is = getInputStreamForSection(subSections.get(index), codec, conf);
+          if (index == 0) {
+            // The first iNode section has a header which must be processed first
+            INodeSection s = INodeSection.parseDelimitedFrom(is);
+            expectedINodes.set(s.getNumInodes());
+          }
+          totalParsed.addAndGet(outputINodes(is, outStream));
+          long timeTaken = Time.monotonicNow() - startTime;
+          LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken);
+        } catch (Exception e) {
+          exceptions.add(new IOException(e));
+        } finally {
+          latch.countDown();
+          try {
+            if (is != null) {
+              is.close();
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Failed to close the input stream, ignoring", ioe);
+          }
+        }
+      });
+    }
+
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for countdown latch", e);
+      throw new IOException(e);
+    }
+
+    executorService.shutdown();
+    if (exceptions.size() != 0) {
+      LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.",
+          exceptions.size());
+      throw exceptions.get(0);
+    }
+    if (totalParsed.get() != expectedINodes.get()) {
+      throw new IOException("Expected to parse " + expectedINodes + " in parallel, " +
+          "but parsed " + totalParsed.get() + ". The image may be corrupt.");
+    }
+    LOG.info("Completed outputting all INode sub-sections to {} tmp files.",
+        subSections.size());
+
+    try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) {
+      ps.println(getHeader());
+    }
+
+    // merge tmp files
+    long startTime = Time.monotonicNow();
+    mergeFiles(paths, parallelOutputFile);
+    long timeTaken = Time.monotonicNow() - startTime;
+    LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken);
   }
 
   protected PermissionStatus getPermission(long perm) {
@@ -763,22 +890,27 @@ abstract class PBImageTextWriter implements Closeable {
     LOG.info("Scanned {} INode directories to build namespace.", count);
   }
 
-  void printIfNotEmpty(String line) {
+  void printIfNotEmpty(PrintStream outStream, String line) {
     if (!line.isEmpty()) {
-      out.println(line);
+      outStream.println(line);
     }
   }
 
-  private void outputINodes(InputStream in) throws IOException {
-    INodeSection s = INodeSection.parseDelimitedFrom(in);
-    LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+  private int outputINodes(InputStream in, PrintStream outStream)
+      throws IOException {
     long ignored = 0;
     long ignoredSnapshots = 0;
-    for (int i = 0; i < s.getNumInodes(); ++i) {
+    // As the input stream is a LimitInputStream, the reading will stop when
+    // EOF is encountered at the end of the stream.
+    int count = 0;
+    while (true) {
       INode p = INode.parseDelimitedFrom(in);
+      if (p == null) {
+        break;
+      }
       try {
         String parentPath = metadataMap.getParentPath(p.getId());
-        printIfNotEmpty(getEntry(parentPath, p));
+        printIfNotEmpty(outStream, getEntry(parentPath, p));
       } catch (IOException ioe) {
         ignored++;
         if (!(ioe instanceof IgnoreSnapshotException)) {
@@ -790,16 +922,16 @@ abstract class PBImageTextWriter implements Closeable {
           }
         }
       }
-
-      if (LOG.isDebugEnabled() && i % 100000 == 0) {
-        LOG.debug("Outputted {} INodes.", i);
+      count++;
+      if (LOG.isDebugEnabled() && count % 100000 == 0) {
+        LOG.debug("Outputted {} INodes.", count);
       }
     }
     if (ignored > 0) {
       LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
               + " debug log for details", ignored, ignoredSnapshots);
     }
-    LOG.info("Outputted {} INodes.", s.getNumInodes());
+    return count;
   }
 
   private static IgnoreSnapshotException createIgnoredSnapshotException(
@@ -822,4 +954,79 @@ abstract class PBImageTextWriter implements Closeable {
     }
     return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
   }
+
+  private ArrayList<FileSummary.Section> getINodeSubSections(
+      ArrayList<FileSummary.Section> sections) {
+    ArrayList<FileSummary.Section> subSections = new ArrayList<>();
+    Iterator<FileSummary.Section> iter = sections.iterator();
+    while (iter.hasNext()) {
+      FileSummary.Section s = iter.next();
+      if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) {
+        subSections.add(s);
+      }
+    }
+    return subSections;
+  }
+
+  /**
+   * Given a FSImage FileSummary.section, return a LimitInput stream set to
+   * the starting position of the section and limited to the section length.
+   * @param section The FileSummary.Section containing the offset and length
+   * @param compressionCodec The compression codec in use, if any
+   * @return An InputStream for the given section
+   * @throws IOException
+   */
+  private InputStream getInputStreamForSection(FileSummary.Section section,
+      String compressionCodec, Configuration conf)
+      throws IOException {
+    // channel of RandomAccessFile is not thread safe, use File
+    FileInputStream fin = new FileInputStream(filename);
+    try {
+      FileChannel channel = fin.getChannel();
+      channel.position(section.getOffset());
+      InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+          section.getLength()));
+
+      in = FSImageUtil.wrapInputStreamForCompression(conf,
+          compressionCodec, in);
+      return in;
+    } catch (IOException e) {
+      fin.close();
+      throw e;
+    }
+  }
+
+  /**
+   * @param srcPaths Source files of contents to be merged
+   * @param resultPath Merged file path
+   * @throws IOException
+   */
+  public static void mergeFiles(String[] srcPaths, String resultPath)
+      throws IOException {
+    if (srcPaths == null || srcPaths.length < 1) {
+      LOG.warn("no source files to merge.");
+      return;
+    }
+
+    File[] files = new File[srcPaths.length];
+    for (int i = 0; i < srcPaths.length; i++) {
+      files[i] = new File(srcPaths[i]);
+    }
+
+    File resultFile = new File(resultPath);
+    try (FileChannel resultChannel =
+             new FileOutputStream(resultFile, true).getChannel()) {
+      for (File file : files) {
+        try (FileChannel src = new FileInputStream(file).getChannel()) {
+          resultChannel.transferFrom(src, resultChannel.size(), src.size());
+        }
+      }
+    }
+
+    for (File file : files) {
+      if (!file.delete() && file.exists()) {
+        LOG.warn("delete tmp file: {} returned false", file);
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
index 53031bc..9878469 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
@@ -83,8 +83,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 import org.apache.hadoop.net.NetUtils;
@@ -122,6 +124,7 @@ import static org.apache.hadoop.fs.permission.AclEntryType.USER;
 import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static org.apache.hadoop.fs.permission.FsAction.EXECUTE;
 import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME;
 import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY;
@@ -186,6 +189,12 @@ public class TestOfflineImageViewer {
       conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
           "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
+      // fsimage with sub-section conf
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
+      conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
+
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
       DistributedFileSystem hdfs = cluster.getFileSystem();
@@ -792,6 +801,13 @@ public class TestOfflineImageViewer {
   }
 
   @Test
+  public void testParallelPBDelimitedWriter() throws Exception {
+    testParallelPBDelimitedWriter("");  // Test in memory db.
+    testParallelPBDelimitedWriter(new FileSystemTestHelper().getTestRootDir()
+        + "/parallel-delimited.db");
+  }
+
+  @Test
   public void testCorruptionOutputEntryBuilder() throws IOException {
     PBImageCorruptionDetector corrDetector =
         new PBImageCorruptionDetector(null, ",", "");
@@ -882,11 +898,10 @@ public class TestOfflineImageViewer {
     final String DELIMITER = "\t";
     ByteArrayOutputStream output = new ByteArrayOutputStream();
 
-    try (PrintStream o = new PrintStream(output);
-        RandomAccessFile r = new RandomAccessFile(originalFsimage, "r")) {
+    try (PrintStream o = new PrintStream(output)) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, db);
-      v.visit(r);
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     Set<String> fileNames = new HashSet<>();
@@ -920,6 +935,37 @@ public class TestOfflineImageViewer {
     assertEquals(writtenFiles.keySet(), fileNames);
   }
 
+  private void testParallelPBDelimitedWriter(String db) throws Exception{
+    String delimiter = "\t";
+    int numThreads = 4;
+
+    File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut");
+    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
+        "-i", originalFsimage.getAbsolutePath(),
+        "-o", parallelDelimitedOut.getAbsolutePath(),
+        "-delimiter", delimiter,
+        "-t", db,
+        "-m", String.valueOf(numThreads)}) != 0) {
+      throw new IOException("oiv returned failure outputting in parallel.");
+    }
+    MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut);
+
+    File serialDelimitedOut = new File(tempDir, "serialDelimitedOut");
+    if (db != "") {
+      db = db + "/../serial.db";
+    }
+    if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
+        "-i", originalFsimage.getAbsolutePath(),
+        "-o", serialDelimitedOut.getAbsolutePath(),
+        "-t", db,
+        "-delimiter", delimiter}) != 0) {
+      throw new IOException("oiv returned failure outputting in serial.");
+    }
+    MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut);
+
+    assertEquals(parallelMd5, serialMd5);
+  }
+
   private void testPBCorruptionDetector(String db)
       throws IOException, InterruptedException {
     final String delimiter = "\t";
@@ -928,7 +974,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, delimiter, db);
-      v.visit(new RandomAccessFile(originalFsimage, "r"));
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     try (
@@ -1024,7 +1070,7 @@ public class TestOfflineImageViewer {
     try (PrintStream o = new PrintStream(output)) {
       PBImageCorruptionDetector v =
           new PBImageCorruptionDetector(o, ",", db);
-      v.visit(new RandomAccessFile(corruptedImage, "r"));
+      v.visit(corruptedImage.getAbsolutePath());
     }
     return output.toString();
   }
@@ -1212,6 +1258,9 @@ public class TestOfflineImageViewer {
   public void testFileDistributionCalculatorForException() throws Exception {
     File fsimageFile = null;
     Configuration conf = new Configuration();
+    // Avoid using the same cluster dir to cause the global originalFsimage
+    // file to be cleared.
+    conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath());
     HashMap<String, FileStatus> files = Maps.newHashMap();
 
     // Create a initial fsimage file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
index 4955846..b23ddf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java
@@ -239,7 +239,7 @@ public class TestOfflineImageViewerForAcl {
     try (PrintStream o = new PrintStream(output)) {
       PBImageDelimitedTextWriter v =
           new PBImageDelimitedTextWriter(o, DELIMITER, "");  // run in memory.
-      v.visit(new RandomAccessFile(originalFsimage, "r"));
+      v.visit(originalFsimage.getAbsolutePath());
     }
 
     try (

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org