You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2021/03/23 21:06:55 UTC

[hadoop] branch trunk updated: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03cfc85  HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.
03cfc85 is described below

commit 03cfc852791c14fad39db4e5b14104a276c08e59
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Mar 24 02:36:26 2021 +0530

    HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.
    
    Signed-off-by: Steve Loughran <st...@apache.org>
---
 .../util/functional/CommonCallableSupplier.java    | 153 +++++++++++
 .../org/apache/hadoop/test/GenericTestUtils.java   | 149 ++++++++++-
 .../fs/s3a/impl/ITestPartialRenamesDeletes.java    |  46 +---
 .../fs/s3a/scale/ITestS3ADeleteManyFiles.java      |   2 +-
 .../org/apache/hadoop/tools/DistCpConstants.java   |   1 +
 .../org/apache/hadoop/tools/DistCpContext.java     |   4 +
 .../apache/hadoop/tools/DistCpOptionSwitch.java    |   7 +-
 .../org/apache/hadoop/tools/DistCpOptions.java     |  19 ++
 .../org/apache/hadoop/tools/OptionsParser.java     |   4 +-
 .../org/apache/hadoop/tools/SimpleCopyListing.java | 294 +++++++++++++--------
 .../hadoop-distcp/src/site/markdown/DistCp.md.vm   |   1 +
 .../org/apache/hadoop/tools/TestDistCpOptions.java |   2 +-
 .../org/apache/hadoop/tools/TestDistCpSystem.java  |   5 -
 .../hadoop/tools/TestDistCpWithRawXAttrs.java      |  23 ++
 .../tools/contract/AbstractContractDistCpTest.java |  39 +++
 .../contract/OptionalTestHDFSContractDistCp.java   |  50 ++++
 .../src/test/resources/contract/hdfs.xml           | 139 ++++++++++
 17 files changed, 773 insertions(+), 165 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
new file mode 100644
index 0000000..e2cdc0f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
+
+/**
+ * A bridge from Callable to Supplier; catching exceptions
+ * raised by the callable and wrapping them as appropriate.
+ * @param <T> return type.
+ */
+public final class CommonCallableSupplier<T> implements Supplier {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CommonCallableSupplier.class);
+
+  private final Callable<T> call;
+
+  /**
+   * Create.
+   * @param call call to invoke.
+   */
+  public CommonCallableSupplier(final Callable<T> call) {
+    this.call = call;
+  }
+
+  @Override
+  public Object get() {
+    try {
+      return call.call();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    } catch (Exception e) {
+      throw new UncheckedIOException(new IOException(e));
+    }
+  }
+
+  /**
+   * Submit a callable into a completable future.
+   * RTEs are rethrown.
+   * Non RTEs are caught and wrapped; IOExceptions to
+   * {@code RuntimeIOException} instances.
+   * @param executor executor.
+   * @param call     call to invoke
+   * @param <T>      type
+   * @return the future to wait for
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> CompletableFuture<T> submit(final Executor executor,
+      final Callable<T> call) {
+    return CompletableFuture
+        .supplyAsync(new CommonCallableSupplier<T>(call), executor);
+  }
+
+  /**
+   * Wait for a list of futures to complete. If the list is empty,
+   * return immediately.
+   * @param futures list of futures.
+   * @throws IOException      if one of the called futures raised an IOE.
+   * @throws RuntimeException if one of the futures raised one.
+   */
+  public static <T> void waitForCompletion(
+      final List<CompletableFuture<T>> futures) throws IOException {
+    if (futures.isEmpty()) {
+      return;
+    }
+    // await completion
+    waitForCompletion(
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
+  }
+
+  /**
+   * Wait for a single of future to complete, extracting IOEs afterwards.
+   * @param future future to wait for.
+   * @throws IOException      if one of the called futures raised an IOE.
+   * @throws RuntimeException if one of the futures raised one.
+   */
+  public static <T> void waitForCompletion(final CompletableFuture<T> future)
+      throws IOException {
+    try (DurationInfo ignore = new DurationInfo(LOG, false,
+        "Waiting for task completion")) {
+      future.join();
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (CompletionException e) {
+      raiseInnerCause(e);
+    }
+  }
+
+  /**
+   * Wait for a single of future to complete, ignoring exceptions raised.
+   * @param future future to wait for.
+   */
+  public static <T> void waitForCompletionIgnoringExceptions(
+      @Nullable final CompletableFuture<T> future) {
+    if (future != null) {
+      try (DurationInfo ignore = new DurationInfo(LOG, false,
+          "Waiting for task completion")) {
+        future.join();
+      } catch (Exception e) {
+        LOG.debug("Ignoring exception raised in task completion: ");
+      }
+    }
+  }
+
+  /**
+   * Block awaiting completion for any non-null future passed in;
+   * No-op if a null arg was supplied.
+   * @param future future
+   * @throws IOException      if one of the called futures raised an IOE.
+   * @throws RuntimeException if one of the futures raised one.
+   */
+  public static void maybeAwaitCompletion(
+      @Nullable final CompletableFuture<Void> future) throws IOException {
+    if (future != null) {
+      waitForCompletion(future);
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index e266f28..33ebd86 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -30,13 +30,17 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.Enumeration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -46,8 +50,11 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Appender;
@@ -61,15 +68,28 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
+import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;
+
 /**
  * Test provides some very generic helpers which might be used across the tests
  */
 public abstract class GenericTestUtils {
 
+  public static final int EXECUTOR_THREAD_COUNT = 64;
+
+  private static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(GenericTestUtils.class);
+
+  public static final String PREFIX = "file-";
+
   private static final AtomicInteger sequence = new AtomicInteger();
 
   /**
@@ -896,5 +916,132 @@ public abstract class GenericTestUtils {
     }
     return threadCount;
   }
+  /**
+   * Write the text to a file asynchronously. Logs the operation duration.
+   * @param fs filesystem
+   * @param path path
+   * @return future to the patch created.
+   */
+  private static CompletableFuture<Path> put(FileSystem fs,
+      Path path, String text) {
+    return submit(EXECUTOR, () -> {
+      try (DurationInfo ignore =
+          new DurationInfo(LOG, false, "Creating %s", path)) {
+        createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
+        return path;
+      }
+    });
+  }
+
+  /**
+   * Build a set of files in a directory tree.
+   * @param fs filesystem
+   * @param destDir destination
+   * @param depth file depth
+   * @param fileCount number of files to create.
+   * @param dirCount number of dirs to create at each level
+   * @return the list of files created.
+   */
+  public static List<Path> createFiles(final FileSystem fs,
+      final Path destDir,
+      final int depth,
+      final int fileCount,
+      final int dirCount) throws IOException {
+    return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
+        new ArrayList<Path>(fileCount),
+        new ArrayList<Path>(dirCount));
+  }
+
+  /**
+   * Build a set of files in a directory tree.
+   * @param fs filesystem
+   * @param destDir destination
+   * @param depth file depth
+   * @param fileCount number of files to create.
+   * @param dirCount number of dirs to create at each level
+   * @param paths [out] list of file paths created
+   * @param dirs [out] list of directory paths created.
+   * @return the list of files created.
+   */
+  public static List<Path> createDirsAndFiles(final FileSystem fs,
+      final Path destDir,
+      final int depth,
+      final int fileCount,
+      final int dirCount,
+      final List<Path> paths,
+      final List<Path> dirs) throws IOException {
+    buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
+    List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+        + dirs.size());
+
+    // create directories. With dir marker retention, that adds more entries
+    // to cause deletion issues
+    try (DurationInfo ignore =
+        new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
+      for (Path path : dirs) {
+        futures.add(submit(EXECUTOR, () ->{
+          fs.mkdirs(path);
+          return path;
+        }));
+      }
+      waitForCompletion(futures);
+    }
+
+    try (DurationInfo ignore =
+        new DurationInfo(LOG, "Creating %d files", paths.size())) {
+      for (Path path : paths) {
+        futures.add(put(fs, path, path.getName()));
+      }
+      waitForCompletion(futures);
+      return paths;
+    }
+  }
 
-}
+  /**
+   * Recursive method to build up lists of files and directories.
+   * @param filePaths list of file paths to add entries to.
+   * @param dirPaths  list of directory paths to add entries to.
+   * @param destDir   destination directory.
+   * @param depth     depth of directories
+   * @param fileCount number of files.
+   * @param dirCount  number of directories.
+   */
+  public static void buildPaths(final List<Path> filePaths,
+      final List<Path> dirPaths, final Path destDir, final int depth,
+      final int fileCount, final int dirCount) {
+    if (depth <= 0) {
+      return;
+    }
+    // create the file paths
+    for (int i = 0; i < fileCount; i++) {
+      String name = filenameOfIndex(i);
+      Path p = new Path(destDir, name);
+      filePaths.add(p);
+    }
+    for (int i = 0; i < dirCount; i++) {
+      String name = String.format("dir-%03d", i);
+      Path p = new Path(destDir, name);
+      dirPaths.add(p);
+      buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
+    }
+  }
+
+  /**
+   * Given an index, return a string to use as the filename.
+   * @param i index
+   * @return name
+   */
+  public static String filenameOfIndex(final int i) {
+    return String.format("%s%03d", PREFIX, i);
+  }
+
+  /**
+   * For submitting work.
+   */
+  private static final BlockingThreadPoolExecutorService EXECUTOR =
+      BlockingThreadPoolExecutorService.newInstance(
+          EXECUTOR_THREAD_COUNT,
+          EXECUTOR_THREAD_COUNT * 2,
+          30, TimeUnit.SECONDS,
+          "test-operations");
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index c920be1..df45d0d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -79,6 +79,7 @@ import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
 import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
 import static org.apache.hadoop.test.LambdaTestUtils.eval;
 
 /**
@@ -162,8 +163,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
   public static final int DEPTH = 2;
   public static final int DEPTH_SCALED = 2;
 
-  public static final String PREFIX = "file-";
-
   /**
    * A role FS; if non-null it is closed in teardown.
    */
@@ -911,49 +910,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
   }
 
   /**
-   * Recursive method to build up lists of files and directories.
-   * @param filePaths list of file paths to add entries to.
-   * @param dirPaths list of directory paths to add entries to.
-   * @param destDir destination directory.
-   * @param depth depth of directories
-   * @param fileCount number of files.
-   * @param dirCount number of directories.
-   */
-  private static void buildPaths(
-      final List<Path> filePaths,
-      final List<Path> dirPaths,
-      final Path destDir,
-      final int depth,
-      final int fileCount,
-      final int dirCount) {
-    if (depth<=0) {
-      return;
-    }
-    // create the file paths
-    for (int i = 0; i < fileCount; i++) {
-      String name = filenameOfIndex(i);
-      Path p = new Path(destDir, name);
-      filePaths.add(p);
-    }
-    for (int i = 0; i < dirCount; i++) {
-      String name = String.format("dir-%03d", i);
-      Path p = new Path(destDir, name);
-      dirPaths.add(p);
-      buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
-    }
-
-  }
-
-  /**
-   * Given an index, return a string to use as the filename.
-   * @param i index
-   * @return name
-   */
-  public static String filenameOfIndex(final int i) {
-    return String.format("%s%03d", PREFIX, i);
-  }
-
-  /**
    * Verifies that s3:DeleteObjectVersion is not required for rename.
    * <p></p>
    * See HADOOP-17621.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index efaec5f..a724b97 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -39,7 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROT
 import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
 import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
-import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex;
+import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex;
 
 /**
  * Test some scalable operations related to file renaming and deletion.
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 2581568..c75c0e8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -139,6 +139,7 @@ public final class DistCpConstants {
   public static final String CONF_LABEL_BLOCKS_PER_CHUNK =
       "distcp.blocks.per.chunk";
 
+  public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
   /**
    * Constants for DistCp return code to shell / consumer of ToolRunner's run
    */
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
index 1e63d80..0d08796 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
@@ -171,6 +171,10 @@ public class DistCpContext {
     return options.getBlocksPerChunk();
   }
 
+  public boolean shouldUseIterator() {
+    return options.shouldUseIterator();
+  }
+
   public final boolean splitLargeFile() {
     return options.getBlocksPerChunk() > 0;
   }
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index 3d319da..4163f82 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -239,7 +239,12 @@ public enum DistCpOptionSwitch {
    */
   DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
       new Option("direct", false, "Write files directly to the"
-          + " target location, avoiding temporary file rename."));
+          + " target location, avoiding temporary file rename.")),
+
+  USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
+      new Option("useiterator", false,
+          "Use single threaded list status iterator to build "
+              + "the listing to save the memory utilisation at the client"));
 
 
   public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 9354c5e..6315528 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -158,6 +158,8 @@ public final class DistCpOptions {
   /** Whether data should be written directly to the target paths. */
   private final boolean directWrite;
 
+  private final boolean useIterator;
+
   /**
    * File attributes for preserve.
    *
@@ -222,6 +224,8 @@ public final class DistCpOptions {
     this.trackPath = builder.trackPath;
 
     this.directWrite = builder.directWrite;
+
+    this.useIterator = builder.useIterator;
   }
 
   public Path getSourceFileListing() {
@@ -353,6 +357,10 @@ public final class DistCpOptions {
     return directWrite;
   }
 
+  public boolean shouldUseIterator() {
+    return useIterator;
+  }
+
   /**
    * Add options to configuration. These will be used in the Mapper/committer
    *
@@ -403,6 +411,9 @@ public final class DistCpOptions {
     }
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
             String.valueOf(directWrite));
+
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR,
+        String.valueOf(useIterator));
   }
 
   /**
@@ -440,6 +451,7 @@ public final class DistCpOptions {
         ", copyBufferSize=" + copyBufferSize +
         ", verboseLog=" + verboseLog +
         ", directWrite=" + directWrite +
+        ", useiterator=" + useIterator +
         '}';
   }
 
@@ -491,6 +503,8 @@ public final class DistCpOptions {
 
     private boolean directWrite = false;
 
+    private boolean useIterator = false;
+
     public Builder(List<Path> sourcePaths, Path targetPath) {
       Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
           "Source paths should not be null or empty!");
@@ -748,6 +762,11 @@ public final class DistCpOptions {
       this.directWrite = newDirectWrite;
       return this;
     }
+
+    public Builder withUseIterator(boolean useItr) {
+      this.useIterator = useItr;
+      return this;
+    }
   }
 
 }
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 1fbea9a..a4c3b0f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -115,7 +115,9 @@ public class OptionsParser {
         .withVerboseLog(
             command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
         .withDirectWrite(
-            command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
+            command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()))
+        .withUseIterator(
+            command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch()));
 
     if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
       String[] snapshots = getVals(command,
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index ddcbb14..900ce62 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.io.SequenceFile;
@@ -36,6 +38,8 @@ import org.apache.hadoop.tools.util.ProducerConsumer;
 import org.apache.hadoop.tools.util.WorkReport;
 import org.apache.hadoop.tools.util.WorkRequest;
 import org.apache.hadoop.tools.util.WorkRequestProcessor;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 
@@ -49,6 +53,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.LinkedList;
+import java.util.Stack;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -94,11 +99,9 @@ public class SimpleCopyListing extends CopyListing {
     randomizeFileListing = getConf().getBoolean(
         DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
         DEFAULT_RANDOMIZE_FILE_LISTING);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("numListstatusThreads=" + numListstatusThreads
-          + ", fileStatusLimit=" + fileStatusLimit
-          + ", randomizeFileListing=" + randomizeFileListing);
-    }
+    LOG.debug(
+        "numListstatusThreads={}, fileStatusLimit={}, randomizeFileListing={}",
+        numListstatusThreads, fileStatusLimit, randomizeFileListing);
     copyFilter = CopyFilter.getCopyFilter(getConf());
     copyFilter.initialize();
   }
@@ -286,10 +289,8 @@ public class SimpleCopyListing extends CopyListing {
 
           FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
           if (sourceStatus.isDirectory()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Adding source dir for traverse: " +
-                  sourceStatus.getPath());
-            }
+            LOG.debug("Adding source dir for traverse: {}",
+                sourceStatus.getPath());
 
             HashSet<String> excludeList =
                 distCpSync.getTraverseExcludeList(diff.getSource(),
@@ -298,8 +299,9 @@ public class SimpleCopyListing extends CopyListing {
             ArrayList<FileStatus> sourceDirs = new ArrayList<>();
             sourceDirs.add(sourceStatus);
 
-            traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                sourceRoot, context, excludeList, fileStatuses);
+            new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
+                sourceRoot, context, excludeList, fileStatuses)
+                .traverseDirectory();
           }
         }
       }
@@ -366,9 +368,8 @@ public class SimpleCopyListing extends CopyListing {
         if (explore) {
           ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
           for (FileStatus sourceStatus: sourceFiles) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
-            }
+            LOG.debug("Recording source-path: {} for copy.",
+                sourceStatus.getPath());
             LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
                 DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
                     preserveAcls && sourceStatus.isDirectory(),
@@ -384,14 +385,13 @@ public class SimpleCopyListing extends CopyListing {
               }
             }
             if (sourceStatus.isDirectory()) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
-              }
+              LOG.debug("Adding source dir for traverse: {}",
+                  sourceStatus.getPath());
               sourceDirs.add(sourceStatus);
             }
           }
-          traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-              sourcePathRoot, context, null, statusList);
+          new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
+              sourcePathRoot, context, null, statusList).traverseDirectory();
         }
       }
       if (randomizeFileListing) {
@@ -429,16 +429,12 @@ public class SimpleCopyListing extends CopyListing {
      */
     Collections.shuffle(fileStatusInfoList, rnd);
     for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
-      }
+      LOG.debug("Adding {}", fileStatusInfo.fileStatus.getPath());
       writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
           fileStatusInfo.sourceRootPath);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Number of paths written to fileListing="
-          + fileStatusInfoList.size());
-    }
+    LOG.debug("Number of paths written to fileListing={}",
+        fileStatusInfoList.size());
     fileStatusInfoList.clear();
   }
 
@@ -590,8 +586,8 @@ public class SimpleCopyListing extends CopyListing {
         result = new WorkReport<FileStatus[]>(getFileStatus(parent.getPath()),
                 retry, true);
       } catch (FileNotFoundException fnf) {
-        LOG.error("FileNotFoundException exception in listStatus: " +
-                  fnf.getMessage());
+        LOG.error("FileNotFoundException exception in listStatus: {}",
+            fnf.getMessage());
         result = new WorkReport<FileStatus[]>(new FileStatus[0], retry, true,
                                               fnf);
       } catch (Exception e) {
@@ -605,8 +601,7 @@ public class SimpleCopyListing extends CopyListing {
   }
 
   private void printStats() {
-    LOG.info("Paths (files+dirs) cnt = " + totalPaths +
-             "; dirCnt = " + totalDirs);
+    LOG.info("Paths (files+dirs) cnt = {}; dirCnt = ", totalPaths, totalDirs);
   }
 
   private void maybePrintStats() {
@@ -615,79 +610,6 @@ public class SimpleCopyListing extends CopyListing {
     }
   }
 
-  private void traverseDirectory(SequenceFile.Writer fileListWriter,
-                                 FileSystem sourceFS,
-                                 ArrayList<FileStatus> sourceDirs,
-                                 Path sourcePathRoot,
-                                 DistCpContext context,
-                                 HashSet<String> excludeList,
-                                 List<FileStatusInfo> fileStatuses)
-                                 throws IOException {
-    final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
-    final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
-    final boolean preserveRawXattrs = context.shouldPreserveRawXattrs();
-
-    assert numListstatusThreads > 0;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting thread pool of " + numListstatusThreads +
-          " listStatus workers.");
-    }
-    ProducerConsumer<FileStatus, FileStatus[]> workers =
-        new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
-    for (int i = 0; i < numListstatusThreads; i++) {
-      workers.addWorker(
-          new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
-              excludeList));
-    }
-
-    for (FileStatus status : sourceDirs) {
-      workers.put(new WorkRequest<FileStatus>(status, 0));
-    }
-
-    while (workers.hasWork()) {
-      try {
-        WorkReport<FileStatus[]> workResult = workers.take();
-        int retry = workResult.getRetry();
-        for (FileStatus child: workResult.getItem()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
-          }
-          if (workResult.getSuccess()) {
-            LinkedList<CopyListingFileStatus> childCopyListingStatus =
-              DistCpUtils.toCopyListingFileStatus(sourceFS, child,
-                preserveAcls && child.isDirectory(),
-                preserveXAttrs && child.isDirectory(),
-                preserveRawXattrs && child.isDirectory(),
-                context.getBlocksPerChunk());
-
-            for (CopyListingFileStatus fs : childCopyListingStatus) {
-              if (randomizeFileListing) {
-                addToFileListing(fileStatuses,
-                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
-              } else {
-                writeToFileListing(fileListWriter, fs, sourcePathRoot);
-              }
-            }
-          }
-          if (retry < maxRetries) {
-            if (child.isDirectory()) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Traversing into source dir: " + child.getPath());
-              }
-              workers.put(new WorkRequest<FileStatus>(child, retry));
-            }
-          } else {
-            LOG.error("Giving up on " + child.getPath() +
-                      " after " + retry + " retries.");
-          }
-        }
-      } catch (InterruptedException ie) {
-        LOG.error("Could not get item from childQueue. Retrying...");
-      }
-    }
-    workers.shutdown();
-  }
-
   private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
       LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
       DistCpContext context) throws IOException {
@@ -697,9 +619,7 @@ public class SimpleCopyListing extends CopyListing {
       if (fs.getPath().equals(sourcePathRoot) &&
           fs.isDirectory() && syncOrOverwrite) {
         // Skip the root-paths when syncOrOverwrite
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip " + fs.getPath());
-        }
+        LOG.debug("Skip {}", fs.getPath());
         return;
       }
       writeToFileListing(fileListWriter, fs, sourcePathRoot);
@@ -709,10 +629,9 @@ public class SimpleCopyListing extends CopyListing {
   private void writeToFileListing(SequenceFile.Writer fileListWriter,
                                   CopyListingFileStatus fileStatus,
                                   Path sourcePathRoot) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
-        fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
-    }
+    LOG.debug("REL PATH: {}, FULL PATH: {}",
+        DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()),
+        fileStatus.getPath());
 
     if (!shouldCopy(fileStatus.getPath())) {
       return;
@@ -730,4 +649,159 @@ public class SimpleCopyListing extends CopyListing {
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        try (DurationInfo ignored = new DurationInfo(LOG,
+            "Building listing using iterator mode for %s", sourcePathRoot)) {
+          traverseDirectoryLegacy();
+        }
+      } else {
+        try (DurationInfo ignored = new DurationInfo(LOG,
+            "Building listing using multi threaded approach for %s",
+            sourcePathRoot)) {
+          traverseDirectoryMultiThreaded();
+        }
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+
+      LOG.debug("Starting thread pool of {} listStatus workers.",
+          numListstatusThreads);
+
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      try {
+        for (int i = 0; i < numListstatusThreads; i++) {
+          workers.addWorker(
+              new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                  excludeList));
+        }
+
+        for (FileStatus status : sourceDirs) {
+          workers.put(new WorkRequest<FileStatus>(status, 0));
+        }
+
+        while (workers.hasWork()) {
+          try {
+            WorkReport<FileStatus[]> workResult = workers.take();
+            int retry = workResult.getRetry();
+            for (FileStatus child : workResult.getItem()) {
+              LOG.debug("Recording source-path: {} for copy.", child.getPath());
+              boolean isChildDirectory = child.isDirectory();
+              if (workResult.getSuccess()) {
+                LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                    DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                        preserveAcls && isChildDirectory,
+                        preserveXAttrs && isChildDirectory,
+                        preserveRawXattrs && isChildDirectory,
+                        context.getBlocksPerChunk());
+
+                for (CopyListingFileStatus fs : childCopyListingStatus) {
+                  if (randomizeFileListing) {
+                    addToFileListing(fileStatuses,
+                        new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                  } else {
+                    writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                  }
+                }
+              }
+              if (retry < maxRetries) {
+                if (isChildDirectory) {
+                  LOG.debug("Traversing into source dir: {}", child.getPath());
+                  workers.put(new WorkRequest<FileStatus>(child, retry));
+                }
+              } else {
+                LOG.error("Giving up on {} after {} retries.", child.getPath(),
+                    retry);
+              }
+            }
+          } catch (InterruptedException ie) {
+            LOG.error("Could not get item from childQueue. Retrying...");
+          }
+        }
+      } finally {
+        workers.shutdown();
+      }
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      LOG.debug("Recording source-path: {} for copy.", path);
+      RemoteIterator<FileStatus> listStatus = RemoteIterators
+          .filteringRemoteIterator(sourceFS.listStatusIterator(path),
+              i -> excludeList == null || !excludeList
+                  .contains(i.getPath().toUri().getPath()));
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+            .toCopyListingFileStatus(sourceFS, child,
+                preserveAcls && child.isDirectory(),
+                preserveXAttrs && child.isDirectory(),
+                preserveRawXattrs && child.isDirectory(),
+                context.getBlocksPerChunk());
+        for (CopyListingFileStatus fs : childCopyListingStatus) {
+          if (randomizeFileListing) {
+            addToFileListing(fileStatuses,
+                new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+          } else {
+            writeToFileListing(fileListWriter, fs, sourcePathRoot);
+          }
+        }
+        if (child.isDirectory()) {
+          LOG.debug("Traversing into source dir: {}", child.getPath());
+          prepareListing(child.getPath());
+        }
+      }
+      IOStatisticsLogging
+          .logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}",
+              listStatus);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index e82d8bc..136b6c8 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -362,6 +362,7 @@ Command Line Options
 | `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | |
 | `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. |
 | `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store |
+| `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option |
 
 Architecture of DistCp
 ----------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index 7382795..1349702 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -289,7 +289,7 @@ public class TestDistCpOptions {
         "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
         "sourcePaths=null, targetPath=xyz, filtersFile='null', " +
         "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
-        "directWrite=false}";
+        "directWrite=false, useiterator=false}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
index 14cce42..47b850f 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
@@ -48,9 +48,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.Timeout;
 
 /**
  * A JUnit test for copying files recursively.
@@ -60,9 +58,6 @@ public class TestDistCpSystem {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDistCpSystem.class);
 
-  @Rule
-  public Timeout globalTimeout = new Timeout(30000);
-
   private static final String SRCDAT = "srcdat";
   private static final String DSTDAT = "dstdat";
   private static final long BLOCK_SIZE = 1024;
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
index 978ccdd..e0e103b 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
@@ -29,9 +29,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.tools.ECAdmin;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.util.DistCpTestUtils;
 
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -69,6 +72,7 @@ public class TestDistCpWithRawXAttrs {
   public static void init() throws Exception {
     conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
             .build();
     cluster.waitActive();
@@ -217,4 +221,23 @@ public class TestDistCpWithRawXAttrs {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+
+    GenericTestUtils.createFiles(fs, source, 3, 10, 10);
+
+    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+        dest.toString(), "-useiterator", conf);
+
+    Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
+        .describedAs("files").hasSize(1110);
+  }
 }
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index 1a40d78..202ead6 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -44,7 +44,9 @@ import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.util.DistCpTestUtils;
+import org.apache.hadoop.util.functional.RemoteIterators;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,6 +61,7 @@ import org.slf4j.LoggerFactory;
  * under test.  The tests in the suite cover both copying from local to remote
  * (e.g. a backup use case) and copying from remote to local (e.g. a restore use
  * case).
+ * The HDFS contract test needs to be run explicitly.
  */
 public abstract class AbstractContractDistCpTest
     extends AbstractFSContractTestBase {
@@ -613,6 +616,42 @@ public abstract class AbstractContractDistCpTest
     directWrite(localFS, localDir, remoteFS, remoteDir, false);
   }
 
+  @Test
+  public void testDistCpWithIterator() throws Exception {
+    describe("Build listing in distCp using the iterator option.");
+    Path source = new Path(remoteDir, "src");
+    Path dest = new Path(localDir, "dest");
+    dest = localFS.makeQualified(dest);
+    mkdirs(remoteFS, source);
+    verifyPathExists(remoteFS, "", source);
+
+    GenericTestUtils
+        .createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
+
+    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+        dest.toString(), "-useiterator", conf);
+
+    Assertions
+        .assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
+        .describedAs("files").hasSize(getTotalFiles());
+  }
+
+  public int getDepth() {
+    return 3;
+  }
+
+  public int getWidth() {
+    return 10;
+  }
+
+  private int getTotalFiles() {
+    int totalFiles = 0;
+    for (int i = 1; i <= getDepth(); i++) {
+      totalFiles += Math.pow(getWidth(), i);
+    }
+    return totalFiles;
+  }
+
   /**
    * Executes a test with support for using direct write option.
    *
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java
new file mode 100644
index 0000000..d8c7424
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.hdfs.HDFSContract;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ * Verifies that the HDFS passes all the tests in
+ * {@link AbstractContractDistCpTest}.
+ * As such, it acts as an in-module validation of this contract test itself.
+ */
+public class OptionalTestHDFSContractDistCp extends AbstractContractDistCpTest {
+
+  @BeforeClass
+  public static void createCluster() throws IOException {
+    HDFSContract.createCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    HDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HDFSContract(conf);
+  }
+}
diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml
new file mode 100644
index 0000000..3c9396f
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml
@@ -0,0 +1,139 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+  <!--
+  Here are most of the HDFS contract options.
+  -->
+
+  <property>
+    <name>fs.contract.test.root-tests-enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.file.contract.test.random-seek-count</name>
+    <value>500</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-case-sensitive</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-append</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-directory-delete</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-rename</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-block-locality</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-concat</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rejects-seek-past-eof</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-strict-exceptions</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-unix-permissions</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-returns-false-if-dest-exists</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-returns-false-if-source-missing</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-settimes</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-getfilestatus</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-file-reference</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-content-check</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-unbuffer</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-hflush</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-hsync</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.metadata_updated_on_hsync</name>
+    <value>false</value>
+  </property>
+
+  <!-- Disable min block size since most tests use tiny blocks -->
+  <property>
+    <name>dfs.namenode.fs-limits.min-block-size</name>
+    <value>0</value>
+  </property>
+</configuration>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org