You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/03/09 19:29:42 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #2732: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories.

steveloughran commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r590624549



##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {

Review comment:
       you can go to slf4j logging here; this is all commons-logging era (distcp lagged)

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools;
 
+import org.apache.hadoop.fs.RemoteIterator;

Review comment:
       needs to go  into the "real hadoop imports" block; your IDE is getting confused. Putting it in the right place makes backporting waay easier

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();

Review comment:
       should this be in a finally clause?

##########
File path: hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {

Review comment:
       Could we have this/a variant of this in AbstractContractDistCpTest so that the object stores will run it.

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
##########
@@ -239,7 +239,12 @@
    */
   DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
       new Option("direct", false, "Write files directly to the"
-          + " target location, avoiding temporary file rename."));
+          + " target location, avoiding temporary file rename.")),
+
+  USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
+      new Option("useIterator", false,

Review comment:
       could we have some non-mixed-case arg; I always get confused here? 

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recording source-path: " + path + " for copy.");
+      }
+      RemoteIterator<FileStatus> listStatus = sourceFS.listStatusIterator(path);
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        if (excludeList != null && excludeList

Review comment:
       `RemoteIterators.filteringRemoteIterator()` would let you do this in an elegant functional programming style -just not backportable to branches without IOStatistics support

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {

Review comment:
       It'd be really good for DurationInfo logging of the time to collect the listings -will help deciding which to use, what option to set

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),

Review comment:
       `child.isDirectory()` is called enough it could go into a variable

##########
File path: hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));
+      // Create 10 directories inside each sub
+      for (int j = 0; j < 10; j++) {
+        fs.mkdirs(new Path("/src/sub" + i + "/subsub" + j));
+
+        // create 10 files under each leaf node.
+        for (int k = 0; k < 10; k++) {
+          Path parentPath = new Path("/src/sub" + i + "/subsub" + j);
+          Path filePath = new Path(parentPath, "file" + k);
+          DFSTestUtil.createFile(fs, filePath, 1024L, (short) 3, 1024L);
+        }
+      }
+    }
+
+    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+        dest.toString(), "-useIterator", conf);
+
+    // Check that all 1000 files got copied.
+    RemoteIterator<LocatedFileStatus> destFileItr = fs.listFiles(dest, true);
+    int numFiles = 0;

Review comment:
       ```
   Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
    .describedAs("files").hasSize(...)
   ```
   
   that way: if the size isn't met, the error includes the list of all files which were found.

##########
File path: hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recording source-path: " + path + " for copy.");
+      }
+      RemoteIterator<FileStatus> listStatus = sourceFS.listStatusIterator(path);
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        if (excludeList != null && excludeList
+            .contains(child.getPath().toUri().getPath())) {
+          continue;
+        }
+        LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+            .toCopyListingFileStatus(sourceFS, child,
+                preserveAcls && child.isDirectory(),
+                preserveXAttrs && child.isDirectory(),
+                preserveRawXattrs && child.isDirectory(),
+                context.getBlocksPerChunk());
+        for (CopyListingFileStatus fs : childCopyListingStatus) {
+          if (randomizeFileListing) {
+            addToFileListing(fileStatuses,
+                new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+          } else {
+            writeToFileListing(fileListWriter, fs, sourcePathRoot);
+          }
+        }
+        if (child.isDirectory()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Traversing into source dir: " + child.getPath());
+          }
+          prepareListing(child.getPath());
+        }
+      }
+    }

Review comment:
       Can you add an `IOStatisticsLogging.logIOStatisticsAtDebug(LOG, listStatus)` call here. That way at debug level you get a log from s3a, soon abfs of what IO took place for the list, performance etc. Really interesting.

##########
File path: hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));
+      // Create 10 directories inside each sub
+      for (int j = 0; j < 10; j++) {
+        fs.mkdirs(new Path("/src/sub" + i + "/subsub" + j));
+
+        // create 10 files under each leaf node.
+        for (int k = 0; k < 10; k++) {
+          Path parentPath = new Path("/src/sub" + i + "/subsub" + j);
+          Path filePath = new Path(parentPath, "file" + k);
+          DFSTestUtil.createFile(fs, filePath, 1024L, (short) 3, 1024L);

Review comment:
       actually, you can go straight to the createfile, without doing any mkdirs. Still going to take 10^3 calls on an object store though. If you do move something of this size there then
   1. the create files should be done in an executor pool (see ITestPartialRenamesDeletes.createDirsAndFiles())
   2. parameters should be something configurable, just a subclass getWidth() would be enough

##########
File path: hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));

Review comment:
       skip this and just delegate to the children; saves 10 calls




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org