You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2021/03/23 21:06:55 UTC
[hadoop] branch trunk updated: HADOOP-17531. DistCp: Reduce memory
usage on copying huge directories. (#2732). Contributed by Ayush Saxena.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 03cfc85 HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.
03cfc85 is described below
commit 03cfc852791c14fad39db4e5b14104a276c08e59
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Mar 24 02:36:26 2021 +0530
HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.
Signed-off-by: Steve Loughran <st...@apache.org>
---
.../util/functional/CommonCallableSupplier.java | 153 +++++++++++
.../org/apache/hadoop/test/GenericTestUtils.java | 149 ++++++++++-
.../fs/s3a/impl/ITestPartialRenamesDeletes.java | 46 +---
.../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 2 +-
.../org/apache/hadoop/tools/DistCpConstants.java | 1 +
.../org/apache/hadoop/tools/DistCpContext.java | 4 +
.../apache/hadoop/tools/DistCpOptionSwitch.java | 7 +-
.../org/apache/hadoop/tools/DistCpOptions.java | 19 ++
.../org/apache/hadoop/tools/OptionsParser.java | 4 +-
.../org/apache/hadoop/tools/SimpleCopyListing.java | 294 +++++++++++++--------
.../hadoop-distcp/src/site/markdown/DistCp.md.vm | 1 +
.../org/apache/hadoop/tools/TestDistCpOptions.java | 2 +-
.../org/apache/hadoop/tools/TestDistCpSystem.java | 5 -
.../hadoop/tools/TestDistCpWithRawXAttrs.java | 23 ++
.../tools/contract/AbstractContractDistCpTest.java | 39 +++
.../contract/OptionalTestHDFSContractDistCp.java | 50 ++++
.../src/test/resources/contract/hdfs.xml | 139 ++++++++++
17 files changed, 773 insertions(+), 165 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
new file mode 100644
index 0000000..e2cdc0f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
+
+/**
+ * A bridge from Callable to Supplier; catching exceptions
+ * raised by the callable and wrapping them as appropriate.
+ * @param <T> return type.
+ */
+public final class CommonCallableSupplier<T> implements Supplier {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CommonCallableSupplier.class);
+
+ private final Callable<T> call;
+
+ /**
+ * Create.
+ * @param call call to invoke.
+ */
+ public CommonCallableSupplier(final Callable<T> call) {
+ this.call = call;
+ }
+
+ @Override
+ public Object get() {
+ try {
+ return call.call();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (Exception e) {
+ throw new UncheckedIOException(new IOException(e));
+ }
+ }
+
+ /**
+ * Submit a callable into a completable future.
+ * RTEs are rethrown.
+ * Non RTEs are caught and wrapped; IOExceptions to
+ * {@code RuntimeIOException} instances.
+ * @param executor executor.
+ * @param call call to invoke
+ * @param <T> type
+ * @return the future to wait for
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> CompletableFuture<T> submit(final Executor executor,
+ final Callable<T> call) {
+ return CompletableFuture
+ .supplyAsync(new CommonCallableSupplier<T>(call), executor);
+ }
+
+ /**
+ * Wait for a list of futures to complete. If the list is empty,
+ * return immediately.
+ * @param futures list of futures.
+ * @throws IOException if one of the called futures raised an IOE.
+ * @throws RuntimeException if one of the futures raised one.
+ */
+ public static <T> void waitForCompletion(
+ final List<CompletableFuture<T>> futures) throws IOException {
+ if (futures.isEmpty()) {
+ return;
+ }
+ // await completion
+ waitForCompletion(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
+ }
+
+ /**
+ * Wait for a single of future to complete, extracting IOEs afterwards.
+ * @param future future to wait for.
+ * @throws IOException if one of the called futures raised an IOE.
+ * @throws RuntimeException if one of the futures raised one.
+ */
+ public static <T> void waitForCompletion(final CompletableFuture<T> future)
+ throws IOException {
+ try (DurationInfo ignore = new DurationInfo(LOG, false,
+ "Waiting for task completion")) {
+ future.join();
+ } catch (CancellationException e) {
+ throw new IOException(e);
+ } catch (CompletionException e) {
+ raiseInnerCause(e);
+ }
+ }
+
+ /**
+ * Wait for a single of future to complete, ignoring exceptions raised.
+ * @param future future to wait for.
+ */
+ public static <T> void waitForCompletionIgnoringExceptions(
+ @Nullable final CompletableFuture<T> future) {
+ if (future != null) {
+ try (DurationInfo ignore = new DurationInfo(LOG, false,
+ "Waiting for task completion")) {
+ future.join();
+ } catch (Exception e) {
+ LOG.debug("Ignoring exception raised in task completion: ");
+ }
+ }
+ }
+
+ /**
+ * Block awaiting completion for any non-null future passed in;
+ * No-op if a null arg was supplied.
+ * @param future future
+ * @throws IOException if one of the called futures raised an IOE.
+ * @throws RuntimeException if one of the futures raised one.
+ */
+ public static void maybeAwaitCompletion(
+ @Nullable final CompletableFuture<Void> future) throws IOException {
+ if (future != null) {
+ waitForCompletion(future);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index e266f28..33ebd86 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -30,13 +30,17 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.Enumeration;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -46,8 +50,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Appender;
@@ -61,15 +68,28 @@ import org.junit.Assert;
import org.junit.Assume;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
+import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;
+
/**
* Test provides some very generic helpers which might be used across the tests
*/
public abstract class GenericTestUtils {
+ public static final int EXECUTOR_THREAD_COUNT = 64;
+
+ private static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(GenericTestUtils.class);
+
+ public static final String PREFIX = "file-";
+
private static final AtomicInteger sequence = new AtomicInteger();
/**
@@ -896,5 +916,132 @@ public abstract class GenericTestUtils {
}
return threadCount;
}
+ /**
+ * Write the text to a file asynchronously. Logs the operation duration.
+ * @param fs filesystem
+ * @param path path
+ * @return future to the patch created.
+ */
+ private static CompletableFuture<Path> put(FileSystem fs,
+ Path path, String text) {
+ return submit(EXECUTOR, () -> {
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Creating %s", path)) {
+ createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
+ return path;
+ }
+ });
+ }
+
+ /**
+ * Build a set of files in a directory tree.
+ * @param fs filesystem
+ * @param destDir destination
+ * @param depth file depth
+ * @param fileCount number of files to create.
+ * @param dirCount number of dirs to create at each level
+ * @return the list of files created.
+ */
+ public static List<Path> createFiles(final FileSystem fs,
+ final Path destDir,
+ final int depth,
+ final int fileCount,
+ final int dirCount) throws IOException {
+ return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
+ new ArrayList<Path>(fileCount),
+ new ArrayList<Path>(dirCount));
+ }
+
+ /**
+ * Build a set of files in a directory tree.
+ * @param fs filesystem
+ * @param destDir destination
+ * @param depth file depth
+ * @param fileCount number of files to create.
+ * @param dirCount number of dirs to create at each level
+ * @param paths [out] list of file paths created
+ * @param dirs [out] list of directory paths created.
+ * @return the list of files created.
+ */
+ public static List<Path> createDirsAndFiles(final FileSystem fs,
+ final Path destDir,
+ final int depth,
+ final int fileCount,
+ final int dirCount,
+ final List<Path> paths,
+ final List<Path> dirs) throws IOException {
+ buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
+ List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ + dirs.size());
+
+ // create directories. With dir marker retention, that adds more entries
+ // to cause deletion issues
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
+ for (Path path : dirs) {
+ futures.add(submit(EXECUTOR, () ->{
+ fs.mkdirs(path);
+ return path;
+ }));
+ }
+ waitForCompletion(futures);
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, "Creating %d files", paths.size())) {
+ for (Path path : paths) {
+ futures.add(put(fs, path, path.getName()));
+ }
+ waitForCompletion(futures);
+ return paths;
+ }
+ }
-}
+ /**
+ * Recursive method to build up lists of files and directories.
+ * @param filePaths list of file paths to add entries to.
+ * @param dirPaths list of directory paths to add entries to.
+ * @param destDir destination directory.
+ * @param depth depth of directories
+ * @param fileCount number of files.
+ * @param dirCount number of directories.
+ */
+ public static void buildPaths(final List<Path> filePaths,
+ final List<Path> dirPaths, final Path destDir, final int depth,
+ final int fileCount, final int dirCount) {
+ if (depth <= 0) {
+ return;
+ }
+ // create the file paths
+ for (int i = 0; i < fileCount; i++) {
+ String name = filenameOfIndex(i);
+ Path p = new Path(destDir, name);
+ filePaths.add(p);
+ }
+ for (int i = 0; i < dirCount; i++) {
+ String name = String.format("dir-%03d", i);
+ Path p = new Path(destDir, name);
+ dirPaths.add(p);
+ buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
+ }
+ }
+
+ /**
+ * Given an index, return a string to use as the filename.
+ * @param i index
+ * @return name
+ */
+ public static String filenameOfIndex(final int i) {
+ return String.format("%s%03d", PREFIX, i);
+ }
+
+ /**
+ * For submitting work.
+ */
+ private static final BlockingThreadPoolExecutorService EXECUTOR =
+ BlockingThreadPoolExecutorService.newInstance(
+ EXECUTOR_THREAD_COUNT,
+ EXECUTOR_THREAD_COUNT * 2,
+ 30, TimeUnit.SECONDS,
+ "test-operations");
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index c920be1..df45d0d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -79,6 +79,7 @@ import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.test.LambdaTestUtils.eval;
/**
@@ -162,8 +163,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
public static final int DEPTH = 2;
public static final int DEPTH_SCALED = 2;
- public static final String PREFIX = "file-";
-
/**
* A role FS; if non-null it is closed in teardown.
*/
@@ -911,49 +910,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
}
/**
- * Recursive method to build up lists of files and directories.
- * @param filePaths list of file paths to add entries to.
- * @param dirPaths list of directory paths to add entries to.
- * @param destDir destination directory.
- * @param depth depth of directories
- * @param fileCount number of files.
- * @param dirCount number of directories.
- */
- private static void buildPaths(
- final List<Path> filePaths,
- final List<Path> dirPaths,
- final Path destDir,
- final int depth,
- final int fileCount,
- final int dirCount) {
- if (depth<=0) {
- return;
- }
- // create the file paths
- for (int i = 0; i < fileCount; i++) {
- String name = filenameOfIndex(i);
- Path p = new Path(destDir, name);
- filePaths.add(p);
- }
- for (int i = 0; i < dirCount; i++) {
- String name = String.format("dir-%03d", i);
- Path p = new Path(destDir, name);
- dirPaths.add(p);
- buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
- }
-
- }
-
- /**
- * Given an index, return a string to use as the filename.
- * @param i index
- * @return name
- */
- public static String filenameOfIndex(final int i) {
- return String.format("%s%03d", PREFIX, i);
- }
-
- /**
* Verifies that s3:DeleteObjectVersion is not required for rename.
* <p></p>
* See HADOOP-17621.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index efaec5f..a724b97 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -39,7 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROT
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
-import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex;
+import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex;
/**
* Test some scalable operations related to file renaming and deletion.
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 2581568..c75c0e8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -139,6 +139,7 @@ public final class DistCpConstants {
public static final String CONF_LABEL_BLOCKS_PER_CHUNK =
"distcp.blocks.per.chunk";
+ public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
*/
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
index 1e63d80..0d08796 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
@@ -171,6 +171,10 @@ public class DistCpContext {
return options.getBlocksPerChunk();
}
+ public boolean shouldUseIterator() {
+ return options.shouldUseIterator();
+ }
+
public final boolean splitLargeFile() {
return options.getBlocksPerChunk() > 0;
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index 3d319da..4163f82 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -239,7 +239,12 @@ public enum DistCpOptionSwitch {
*/
DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
new Option("direct", false, "Write files directly to the"
- + " target location, avoiding temporary file rename."));
+ + " target location, avoiding temporary file rename.")),
+
+ USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
+ new Option("useiterator", false,
+ "Use single threaded list status iterator to build "
+ + "the listing to save the memory utilisation at the client"));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 9354c5e..6315528 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -158,6 +158,8 @@ public final class DistCpOptions {
/** Whether data should be written directly to the target paths. */
private final boolean directWrite;
+ private final boolean useIterator;
+
/**
* File attributes for preserve.
*
@@ -222,6 +224,8 @@ public final class DistCpOptions {
this.trackPath = builder.trackPath;
this.directWrite = builder.directWrite;
+
+ this.useIterator = builder.useIterator;
}
public Path getSourceFileListing() {
@@ -353,6 +357,10 @@ public final class DistCpOptions {
return directWrite;
}
+ public boolean shouldUseIterator() {
+ return useIterator;
+ }
+
/**
* Add options to configuration. These will be used in the Mapper/committer
*
@@ -403,6 +411,9 @@ public final class DistCpOptions {
}
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
String.valueOf(directWrite));
+
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR,
+ String.valueOf(useIterator));
}
/**
@@ -440,6 +451,7 @@ public final class DistCpOptions {
", copyBufferSize=" + copyBufferSize +
", verboseLog=" + verboseLog +
", directWrite=" + directWrite +
+ ", useiterator=" + useIterator +
'}';
}
@@ -491,6 +503,8 @@ public final class DistCpOptions {
private boolean directWrite = false;
+ private boolean useIterator = false;
+
public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!");
@@ -748,6 +762,11 @@ public final class DistCpOptions {
this.directWrite = newDirectWrite;
return this;
}
+
+ public Builder withUseIterator(boolean useItr) {
+ this.useIterator = useItr;
+ return this;
+ }
}
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 1fbea9a..a4c3b0f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -115,7 +115,9 @@ public class OptionsParser {
.withVerboseLog(
command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
.withDirectWrite(
- command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
+ command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()))
+ .withUseIterator(
+ command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command,
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index ddcbb14..900ce62 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.SequenceFile;
@@ -36,6 +38,8 @@ import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
@@ -49,6 +53,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.LinkedList;
+import java.util.Stack;
import static org.apache.hadoop.tools.DistCpConstants
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -94,11 +99,9 @@ public class SimpleCopyListing extends CopyListing {
randomizeFileListing = getConf().getBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
DEFAULT_RANDOMIZE_FILE_LISTING);
- if (LOG.isDebugEnabled()) {
- LOG.debug("numListstatusThreads=" + numListstatusThreads
- + ", fileStatusLimit=" + fileStatusLimit
- + ", randomizeFileListing=" + randomizeFileListing);
- }
+ LOG.debug(
+ "numListstatusThreads={}, fileStatusLimit={}, randomizeFileListing={}",
+ numListstatusThreads, fileStatusLimit, randomizeFileListing);
copyFilter = CopyFilter.getCopyFilter(getConf());
copyFilter.initialize();
}
@@ -286,10 +289,8 @@ public class SimpleCopyListing extends CopyListing {
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
if (sourceStatus.isDirectory()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding source dir for traverse: " +
- sourceStatus.getPath());
- }
+ LOG.debug("Adding source dir for traverse: {}",
+ sourceStatus.getPath());
HashSet<String> excludeList =
distCpSync.getTraverseExcludeList(diff.getSource(),
@@ -298,8 +299,9 @@ public class SimpleCopyListing extends CopyListing {
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
sourceDirs.add(sourceStatus);
- traverseDirectory(fileListWriter, sourceFS, sourceDirs,
- sourceRoot, context, excludeList, fileStatuses);
+ new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
+ sourceRoot, context, excludeList, fileStatuses)
+ .traverseDirectory();
}
}
}
@@ -366,9 +368,8 @@ public class SimpleCopyListing extends CopyListing {
if (explore) {
ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
for (FileStatus sourceStatus: sourceFiles) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
- }
+ LOG.debug("Recording source-path: {} for copy.",
+ sourceStatus.getPath());
LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
preserveAcls && sourceStatus.isDirectory(),
@@ -384,14 +385,13 @@ public class SimpleCopyListing extends CopyListing {
}
}
if (sourceStatus.isDirectory()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
- }
+ LOG.debug("Adding source dir for traverse: {}",
+ sourceStatus.getPath());
sourceDirs.add(sourceStatus);
}
}
- traverseDirectory(fileListWriter, sourceFS, sourceDirs,
- sourcePathRoot, context, null, statusList);
+ new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
+ sourcePathRoot, context, null, statusList).traverseDirectory();
}
}
if (randomizeFileListing) {
@@ -429,16 +429,12 @@ public class SimpleCopyListing extends CopyListing {
*/
Collections.shuffle(fileStatusInfoList, rnd);
for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
- }
+ LOG.debug("Adding {}", fileStatusInfo.fileStatus.getPath());
writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
fileStatusInfo.sourceRootPath);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of paths written to fileListing="
- + fileStatusInfoList.size());
- }
+ LOG.debug("Number of paths written to fileListing={}",
+ fileStatusInfoList.size());
fileStatusInfoList.clear();
}
@@ -590,8 +586,8 @@ public class SimpleCopyListing extends CopyListing {
result = new WorkReport<FileStatus[]>(getFileStatus(parent.getPath()),
retry, true);
} catch (FileNotFoundException fnf) {
- LOG.error("FileNotFoundException exception in listStatus: " +
- fnf.getMessage());
+ LOG.error("FileNotFoundException exception in listStatus: {}",
+ fnf.getMessage());
result = new WorkReport<FileStatus[]>(new FileStatus[0], retry, true,
fnf);
} catch (Exception e) {
@@ -605,8 +601,7 @@ public class SimpleCopyListing extends CopyListing {
}
private void printStats() {
- LOG.info("Paths (files+dirs) cnt = " + totalPaths +
- "; dirCnt = " + totalDirs);
+ LOG.info("Paths (files+dirs) cnt = {}; dirCnt = ", totalPaths, totalDirs);
}
private void maybePrintStats() {
@@ -615,79 +610,6 @@ public class SimpleCopyListing extends CopyListing {
}
}
- private void traverseDirectory(SequenceFile.Writer fileListWriter,
- FileSystem sourceFS,
- ArrayList<FileStatus> sourceDirs,
- Path sourcePathRoot,
- DistCpContext context,
- HashSet<String> excludeList,
- List<FileStatusInfo> fileStatuses)
- throws IOException {
- final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
- final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
- final boolean preserveRawXattrs = context.shouldPreserveRawXattrs();
-
- assert numListstatusThreads > 0;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting thread pool of " + numListstatusThreads +
- " listStatus workers.");
- }
- ProducerConsumer<FileStatus, FileStatus[]> workers =
- new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
- for (int i = 0; i < numListstatusThreads; i++) {
- workers.addWorker(
- new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
- excludeList));
- }
-
- for (FileStatus status : sourceDirs) {
- workers.put(new WorkRequest<FileStatus>(status, 0));
- }
-
- while (workers.hasWork()) {
- try {
- WorkReport<FileStatus[]> workResult = workers.take();
- int retry = workResult.getRetry();
- for (FileStatus child: workResult.getItem()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
- }
- if (workResult.getSuccess()) {
- LinkedList<CopyListingFileStatus> childCopyListingStatus =
- DistCpUtils.toCopyListingFileStatus(sourceFS, child,
- preserveAcls && child.isDirectory(),
- preserveXAttrs && child.isDirectory(),
- preserveRawXattrs && child.isDirectory(),
- context.getBlocksPerChunk());
-
- for (CopyListingFileStatus fs : childCopyListingStatus) {
- if (randomizeFileListing) {
- addToFileListing(fileStatuses,
- new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
- } else {
- writeToFileListing(fileListWriter, fs, sourcePathRoot);
- }
- }
- }
- if (retry < maxRetries) {
- if (child.isDirectory()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Traversing into source dir: " + child.getPath());
- }
- workers.put(new WorkRequest<FileStatus>(child, retry));
- }
- } else {
- LOG.error("Giving up on " + child.getPath() +
- " after " + retry + " retries.");
- }
- }
- } catch (InterruptedException ie) {
- LOG.error("Could not get item from childQueue. Retrying...");
- }
- }
- workers.shutdown();
- }
-
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
DistCpContext context) throws IOException {
@@ -697,9 +619,7 @@ public class SimpleCopyListing extends CopyListing {
if (fs.getPath().equals(sourcePathRoot) &&
fs.isDirectory() && syncOrOverwrite) {
// Skip the root-paths when syncOrOverwrite
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip " + fs.getPath());
- }
+ LOG.debug("Skip {}", fs.getPath());
return;
}
writeToFileListing(fileListWriter, fs, sourcePathRoot);
@@ -709,10 +629,9 @@ public class SimpleCopyListing extends CopyListing {
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
- fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
- }
+ LOG.debug("REL PATH: {}, FULL PATH: {}",
+ DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()),
+ fileStatus.getPath());
if (!shouldCopy(fileStatus.getPath())) {
return;
@@ -730,4 +649,159 @@ public class SimpleCopyListing extends CopyListing {
totalPaths++;
maybePrintStats();
}
+
+ /**
+ * A utility class to traverse a directory.
+ */
+ private final class TraverseDirectory {
+
+ private SequenceFile.Writer fileListWriter;
+ private FileSystem sourceFS;
+ private ArrayList<FileStatus> sourceDirs;
+ private Path sourcePathRoot;
+ private DistCpContext context;
+ private HashSet<String> excludeList;
+ private List<FileStatusInfo> fileStatuses;
+ private final boolean preserveAcls;
+ private final boolean preserveXAttrs;
+ private final boolean preserveRawXattrs;
+
+ private TraverseDirectory(SequenceFile.Writer fileListWriter,
+ FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+ Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
+ List<FileStatusInfo> fileStatuses) {
+ this.fileListWriter = fileListWriter;
+ this.sourceFS = sourceFS;
+ this.sourceDirs = sourceDirs;
+ this.sourcePathRoot = sourcePathRoot;
+ this.context = context;
+ this.excludeList = excludeList;
+ this.fileStatuses = fileStatuses;
+ this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+ this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+ this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+ }
+
+ public void traverseDirectory() throws IOException {
+ if (context.shouldUseIterator()) {
+ try (DurationInfo ignored = new DurationInfo(LOG,
+ "Building listing using iterator mode for %s", sourcePathRoot)) {
+ traverseDirectoryLegacy();
+ }
+ } else {
+ try (DurationInfo ignored = new DurationInfo(LOG,
+ "Building listing using multi threaded approach for %s",
+ sourcePathRoot)) {
+ traverseDirectoryMultiThreaded();
+ }
+ }
+ }
+
+ public void traverseDirectoryMultiThreaded() throws IOException {
+ assert numListstatusThreads > 0;
+
+ LOG.debug("Starting thread pool of {} listStatus workers.",
+ numListstatusThreads);
+
+ ProducerConsumer<FileStatus, FileStatus[]> workers =
+ new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+ try {
+ for (int i = 0; i < numListstatusThreads; i++) {
+ workers.addWorker(
+ new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+ excludeList));
+ }
+
+ for (FileStatus status : sourceDirs) {
+ workers.put(new WorkRequest<FileStatus>(status, 0));
+ }
+
+ while (workers.hasWork()) {
+ try {
+ WorkReport<FileStatus[]> workResult = workers.take();
+ int retry = workResult.getRetry();
+ for (FileStatus child : workResult.getItem()) {
+ LOG.debug("Recording source-path: {} for copy.", child.getPath());
+ boolean isChildDirectory = child.isDirectory();
+ if (workResult.getSuccess()) {
+ LinkedList<CopyListingFileStatus> childCopyListingStatus =
+ DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+ preserveAcls && isChildDirectory,
+ preserveXAttrs && isChildDirectory,
+ preserveRawXattrs && isChildDirectory,
+ context.getBlocksPerChunk());
+
+ for (CopyListingFileStatus fs : childCopyListingStatus) {
+ if (randomizeFileListing) {
+ addToFileListing(fileStatuses,
+ new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+ } else {
+ writeToFileListing(fileListWriter, fs, sourcePathRoot);
+ }
+ }
+ }
+ if (retry < maxRetries) {
+ if (isChildDirectory) {
+ LOG.debug("Traversing into source dir: {}", child.getPath());
+ workers.put(new WorkRequest<FileStatus>(child, retry));
+ }
+ } else {
+ LOG.error("Giving up on {} after {} retries.", child.getPath(),
+ retry);
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Could not get item from childQueue. Retrying...");
+ }
+ }
+ } finally {
+ workers.shutdown();
+ }
+ }
+
+ private void traverseDirectoryLegacy() throws IOException {
+ Stack<FileStatus> pathStack = new Stack<FileStatus>();
+ for (FileStatus fs : sourceDirs) {
+ if (excludeList == null || !excludeList
+ .contains(fs.getPath().toUri().getPath())) {
+ pathStack.add(fs);
+ }
+ }
+ while (!pathStack.isEmpty()) {
+ prepareListing(pathStack.pop().getPath());
+ }
+ }
+
+ private void prepareListing(Path path) throws IOException {
+ LOG.debug("Recording source-path: {} for copy.", path);
+ RemoteIterator<FileStatus> listStatus = RemoteIterators
+ .filteringRemoteIterator(sourceFS.listStatusIterator(path),
+ i -> excludeList == null || !excludeList
+ .contains(i.getPath().toUri().getPath()));
+ while (listStatus.hasNext()) {
+ FileStatus child = listStatus.next();
+ LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+ .toCopyListingFileStatus(sourceFS, child,
+ preserveAcls && child.isDirectory(),
+ preserveXAttrs && child.isDirectory(),
+ preserveRawXattrs && child.isDirectory(),
+ context.getBlocksPerChunk());
+ for (CopyListingFileStatus fs : childCopyListingStatus) {
+ if (randomizeFileListing) {
+ addToFileListing(fileStatuses,
+ new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+ } else {
+ writeToFileListing(fileListWriter, fs, sourcePathRoot);
+ }
+ }
+ if (child.isDirectory()) {
+ LOG.debug("Traversing into source dir: {}", child.getPath());
+ prepareListing(child.getPath());
+ }
+ }
+ IOStatisticsLogging
+ .logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}",
+ listStatus);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index e82d8bc..136b6c8 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -362,6 +362,7 @@ Command Line Options
| `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | |
| `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. |
| `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store |
+| `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option |
Architecture of DistCp
----------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index 7382795..1349702 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -289,7 +289,7 @@ public class TestDistCpOptions {
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null', " +
"blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
- "directWrite=false}";
+ "directWrite=false, useiterator=false}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
index 14cce42..47b850f 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
@@ -48,9 +48,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.Timeout;
/**
* A JUnit test for copying files recursively.
@@ -60,9 +58,6 @@ public class TestDistCpSystem {
private static final Logger LOG =
LoggerFactory.getLogger(TestDistCpSystem.class);
- @Rule
- public Timeout globalTimeout = new Timeout(30000);
-
private static final String SRCDAT = "srcdat";
private static final String DSTDAT = "dstdat";
private static final long BLOCK_SIZE = 1024;
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
index 978ccdd..e0e103b 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
@@ -29,9 +29,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.tools.ECAdmin;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -69,6 +72,7 @@ public class TestDistCpWithRawXAttrs {
public static void init() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
.build();
cluster.waitActive();
@@ -217,4 +221,23 @@ public class TestDistCpWithRawXAttrs {
assertTrue("/dest/dir1/subdir1 is not erasure coded!",
destSubDir1Status.isErasureCoded());
}
+
+ @Test
+ public void testUseIterator() throws Exception {
+
+ Path source = new Path("/src");
+ Path dest = new Path("/dest");
+ fs.delete(source, true);
+ fs.delete(dest, true);
+ // Create a source dir
+ fs.mkdirs(source);
+
+ GenericTestUtils.createFiles(fs, source, 3, 10, 10);
+
+ DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+ dest.toString(), "-useiterator", conf);
+
+ Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
+ .describedAs("files").hasSize(1110);
+ }
}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index 1a40d78..202ead6 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -44,7 +44,9 @@ import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.util.DistCpTestUtils;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -59,6 +61,7 @@ import org.slf4j.LoggerFactory;
* under test. The tests in the suite cover both copying from local to remote
* (e.g. a backup use case) and copying from remote to local (e.g. a restore use
* case).
+ * The HDFS contract test needs to be run explicitly.
*/
public abstract class AbstractContractDistCpTest
extends AbstractFSContractTestBase {
@@ -613,6 +616,42 @@ public abstract class AbstractContractDistCpTest
directWrite(localFS, localDir, remoteFS, remoteDir, false);
}
+ @Test
+ public void testDistCpWithIterator() throws Exception {
+ describe("Build listing in distCp using the iterator option.");
+ Path source = new Path(remoteDir, "src");
+ Path dest = new Path(localDir, "dest");
+ dest = localFS.makeQualified(dest);
+ mkdirs(remoteFS, source);
+ verifyPathExists(remoteFS, "", source);
+
+ GenericTestUtils
+ .createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
+
+ DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+ dest.toString(), "-useiterator", conf);
+
+ Assertions
+ .assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
+ .describedAs("files").hasSize(getTotalFiles());
+ }
+
+ public int getDepth() {
+ return 3;
+ }
+
+ public int getWidth() {
+ return 10;
+ }
+
+ private int getTotalFiles() {
+ int totalFiles = 0;
+ for (int i = 1; i <= getDepth(); i++) {
+ totalFiles += Math.pow(getWidth(), i);
+ }
+ return totalFiles;
+ }
+
/**
* Executes a test with support for using direct write option.
*
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java
new file mode 100644
index 0000000..d8c7424
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.hdfs.HDFSContract;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ * Verifies that the HDFS passes all the tests in
+ * {@link AbstractContractDistCpTest}.
+ * As such, it acts as an in-module validation of this contract test itself.
+ */
+public class OptionalTestHDFSContractDistCp extends AbstractContractDistCpTest {
+
+ @BeforeClass
+ public static void createCluster() throws IOException {
+ HDFSContract.createCluster();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws IOException {
+ HDFSContract.destroyCluster();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HDFSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml
new file mode 100644
index 0000000..3c9396f
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml
@@ -0,0 +1,139 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <!--
+ Here are most of the HDFS contract options.
+ -->
+
+ <property>
+ <name>fs.contract.test.root-tests-enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.file.contract.test.random-seek-count</name>
+ <value>500</value>
+ </property>
+
+ <property>
+ <name>fs.contract.is-case-sensitive</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-append</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-directory-delete</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-rename</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-block-locality</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-concat</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-seek</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rejects-seek-past-eof</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-strict-exceptions</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-unix-permissions</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rename-returns-false-if-dest-exists</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rename-returns-false-if-source-missing</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-settimes</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-getfilestatus</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-file-reference</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-content-check</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-unbuffer</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-hflush</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-hsync</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.metadata_updated_on_hsync</name>
+ <value>false</value>
+ </property>
+
+ <!-- Disable min block size since most tests use tiny blocks -->
+ <property>
+ <name>dfs.namenode.fs-limits.min-block-size</name>
+ <value>0</value>
+ </property>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org