You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/02/07 10:10:45 UTC

[hadoop] branch trunk updated: HADOOP-15281. Distcp to add no-rename copy option.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de804e5  HADOOP-15281. Distcp to add no-rename copy option.
de804e5 is described below

commit de804e53b9d20a2df75a4c7252bf83ed52011488
Author: Andrew Olson <an...@cerner.com>
AuthorDate: Thu Feb 7 10:05:58 2019 +0000

    HADOOP-15281. Distcp to add no-rename copy option.
    
    Contributed by Andrew Olson.
---
 .../fs/contract/s3a/ITestS3AContractDistCp.java    | 33 +++++++++++
 .../org/apache/hadoop/tools/DistCpConstants.java   |  3 +-
 .../org/apache/hadoop/tools/DistCpContext.java     |  4 ++
 .../apache/hadoop/tools/DistCpOptionSwitch.java    | 14 ++++-
 .../org/apache/hadoop/tools/DistCpOptions.java     | 19 ++++++
 .../org/apache/hadoop/tools/OptionsParser.java     |  4 +-
 .../org/apache/hadoop/tools/mapred/CopyMapper.java |  6 +-
 .../tools/mapred/RetriableFileCopyCommand.java     | 52 ++++++++++++-----
 .../hadoop-distcp/src/site/markdown/DistCp.md.vm   |  6 +-
 .../org/apache/hadoop/tools/TestDistCpOptions.java |  5 +-
 .../tools/contract/AbstractContractDistCpTest.java | 68 +++++++++++++++++++++-
 11 files changed, 191 insertions(+), 23 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index b3d511e..740f256 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -26,6 +27,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
 import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
 
@@ -74,4 +76,35 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
     Path path = super.path(filepath);
     return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
   }
+
+  @Override
+  public void testDirectWrite() throws Exception {
+    resetStorageStatistics();
+    super.testDirectWrite();
+    assertEquals("Expected no renames for a direct write distcp", 0L,
+        getRenameOperationCount());
+  }
+
+  @Override
+  public void testNonDirectWrite() throws Exception {
+    resetStorageStatistics();
+    try {
+      super.testNonDirectWrite();
+    } catch (FileNotFoundException e) {
+      // We may get this exception when data is written to a DELAY_LISTING_ME
+      // directory causing verification of the distcp success to fail if
+      // S3Guard is not enabled
+    }
+    assertEquals("Expected 2 renames for a non-direct write distcp", 2L,
+        getRenameOperationCount());
+  }
+
+  private void resetStorageStatistics() {
+    getFileSystem().getStorageStatistics().reset();
+  }
+
+  private long getRenameOperationCount() {
+    return getFileSystem().getStorageStatistics()
+        .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
+  }
 }
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 4946091..e20f206 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -85,7 +85,8 @@ public final class DistCpConstants {
       "distcp.dynamic.min.records_per_chunk";
   public static final String CONF_LABEL_SPLIT_RATIO =
       "distcp.dynamic.split.ratio";
-  
+  public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write";
+
   /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
   public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
 
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
index fc047ca..1e63d80 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
@@ -179,6 +179,10 @@ public class DistCpContext {
     return options.getCopyBufferSize();
   }
 
+  public boolean shouldDirectWrite() {
+    return options.shouldDirectWrite();
+  }
+
   public void setTargetPathExists(boolean targetPathExists) {
     this.targetPathExists = targetPathExists;
   }
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index e57e413..49ffc59 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -223,7 +223,19 @@ public enum DistCpOptionSwitch {
    */
   FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE,
       new Option("filters", true, "The path to a file containing a list of"
-          + " strings for paths to be excluded from the copy."));
+          + " strings for paths to be excluded from the copy.")),
+
+  /**
+   * Write directly to the final location, avoiding the creation and rename
+   * of temporary files.
+   * This is typically useful in cases where the target filesystem
+   * implementation does not support atomic rename operations, such as with
+   * the S3AFileSystem which translates file renames to potentially very
+   * expensive copy-then-delete operations.
+   */
+  DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
+      new Option("direct", false, "Write files directly to the"
+          + " target location, avoiding temporary file rename."));
 
 
   public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index aca5d0e..4a6552f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -155,6 +155,9 @@ public final class DistCpOptions {
 
   private final int copyBufferSize;
 
+  /** Whether data should be written directly to the target paths. */
+  private final boolean directWrite;
+
   /**
    * File attributes for preserve.
    *
@@ -216,6 +219,8 @@ public final class DistCpOptions {
     this.copyBufferSize = builder.copyBufferSize;
     this.verboseLog = builder.verboseLog;
     this.trackPath = builder.trackPath;
+
+    this.directWrite = builder.directWrite;
   }
 
   public Path getSourceFileListing() {
@@ -343,6 +348,10 @@ public final class DistCpOptions {
     return trackPath;
   }
 
+  public boolean shouldDirectWrite() {
+    return directWrite;
+  }
+
   /**
    * Add options to configuration. These will be used in the Mapper/committer
    *
@@ -391,6 +400,8 @@ public final class DistCpOptions {
       DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
           Integer.toString(numListstatusThreads));
     }
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
+            String.valueOf(directWrite));
   }
 
   /**
@@ -427,6 +438,7 @@ public final class DistCpOptions {
         ", blocksPerChunk=" + blocksPerChunk +
         ", copyBufferSize=" + copyBufferSize +
         ", verboseLog=" + verboseLog +
+        ", directWrite=" + directWrite +
         '}';
   }
 
@@ -476,6 +488,8 @@ public final class DistCpOptions {
     private int copyBufferSize =
             DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
 
+    private boolean directWrite = false;
+
     public Builder(List<Path> sourcePaths, Path targetPath) {
       Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
           "Source paths should not be null or empty!");
@@ -728,6 +742,11 @@ public final class DistCpOptions {
       this.verboseLog = newVerboseLog;
       return this;
     }
+
+    public Builder withDirectWrite(boolean newDirectWrite) {
+      this.directWrite = newDirectWrite;
+      return this;
+    }
   }
 
 }
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 83c6ff3..3b9d13b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -113,7 +113,9 @@ public class OptionsParser {
         .withBlocking(
             !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch()))
         .withVerboseLog(
-            command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()));
+            command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
+        .withDirectWrite(
+            command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
 
     if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
       String[] snapshots = getVals(command,
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index 63a61b8..336779e 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -84,6 +84,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private boolean overWrite = false;
   private boolean append = false;
   private boolean verboseLog = false;
+  private boolean directWrite = false;
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
   private FileSystem targetFS = null;
@@ -111,6 +112,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false);
     preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
         PRESERVE_STATUS.getConfigLabel()));
+    directWrite = conf.getBoolean(
+        DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
 
     targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
     Path targetFinalPath = new Path(conf.get(
@@ -253,7 +256,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     long bytesCopied;
     try {
       bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
-          action).execute(sourceFileStatus, target, context, fileAttributes);
+          action, directWrite).execute(sourceFileStatus, target, context,
+              fileAttributes);
     } catch (Exception e) {
       context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
       throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 51579bc..db21f64 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
 
   private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class);
   private boolean skipCrc = false;
+  private boolean directWrite = false;
   private FileAction action;
 
   /**
@@ -80,6 +81,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   }
 
   /**
+   * Create a RetriableFileCopyCommand.
+   *
+   * @param skipCrc Whether to skip the crc check.
+   * @param description A verbose description of the copy operation.
+   * @param action We should overwrite the target file or append new data to it.
+   * @param directWrite Whether to write directly to the target path, avoiding a
+   *                    temporary file rename.
+   */
+  public RetriableFileCopyCommand(boolean skipCrc, String description,
+          FileAction action, boolean directWrite) {
+    this(skipCrc, description, action);
+    this.directWrite = directWrite;
+  }
+
+  /**
    * Implementation of RetriableCommand::doExecute().
    * This is the actual copy-implementation.
    * @param arguments Argument-list to the command.
@@ -102,16 +118,19 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   private long doCopy(CopyListingFileStatus source, Path target,
       Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
       throws IOException {
+    LOG.info("Copying {} to {}", source.getPath(), target);
+
     final boolean toAppend = action == FileAction.APPEND;
-    Path targetPath = toAppend ? target : getTmpFile(target, context);
+    final boolean useTempTarget = !toAppend && !directWrite;
+    Path targetPath = useTempTarget ? getTempFile(target, context) : target;
+
+    LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary"
+        : "direct", targetPath);
+
     final Configuration configuration = context.getConfiguration();
     FileSystem targetFS = target.getFileSystem(configuration);
 
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Copying " + source.getPath() + " to " + target);
-        LOG.debug("Target file path: " + targetPath);
-      }
       final Path sourcePath = source.getPath();
       final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
       final FileChecksum sourceChecksum = fileAttributes
@@ -134,17 +153,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
               targetFS, targetPath);
         }
       }
-      // it's not append case, thus we first write to a temporary file, rename
-      // it to the target path.
-      if (!toAppend) {
+      // it's not append or direct write (preferred for s3a) case, thus we first
+      // write to a temporary file, then rename it to the target path.
+      if (useTempTarget) {
+        LOG.info("Renaming temporary target file path {} to {}", targetPath,
+            target);
         promoteTmpToTarget(targetPath, target, targetFS);
       }
+      LOG.info("Completed writing {} ({} bytes)", target, bytesRead);
       return bytesRead;
     } finally {
       // note that for append case, it is possible that we append partial data
       // and then fail. In that case, for the next retry, we either reuse the
       // partial appended data if it is good or we overwrite the whole file
-      if (!toAppend) {
+      if (useTempTarget) {
         targetFS.delete(targetPath, false);
       }
     }
@@ -252,14 +274,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     }
   }
 
-  private Path getTmpFile(Path target, Mapper.Context context) {
+  private Path getTempFile(Path target, Mapper.Context context) {
     Path targetWorkPath = new Path(context.getConfiguration().
         get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
 
-    Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
-    LOG.info("Creating temp file: " +
-        new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
-    return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
+    Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
+        : targetWorkPath;
+    Path tempFile = new Path(root, ".distcp.tmp." +
+        context.getTaskAttemptID().toString());
+    LOG.info("Creating temp file: {}", tempFile);
+    return tempFile;
   }
 
   @VisibleForTesting
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index b855422..25ea7e2 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -241,6 +241,7 @@ Flag              | Description                          | Notes
 `-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements [...]
 `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
 `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option.
+`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store
 
 Architecture of DistCp
 ----------------------
@@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts.
 DistCp can be used to upload data
 
 ```bash
-hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
+hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
 ```
 
 To download data
@@ -535,6 +536,9 @@ rely on disk buffering.
     Copies each byte down to the Hadoop worker nodes and back to the
 bucket. As well as being slow, it means that charges may be incurred.
 
+* The `-direct` option can be used to write to object store target paths directly,
+avoiding the potentially very expensive temporary file rename operations that would
+otherwise occur.
 
 Frequently Asked Questions
 --------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index 62a2e6d..7382795 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -287,8 +287,9 @@ public class TestDistCpOptions {
         "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
         "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
         "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
-        "sourcePaths=null, targetPath=xyz, filtersFile='null'," +
-        " blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}";
+        "sourcePaths=null, targetPath=xyz, filtersFile='null', " +
+        "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
+        "directWrite=false}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index 0757a66..eeaf30a 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -552,7 +552,7 @@ public abstract class AbstractContractDistCpTest
 
   /**
    * Run the distcp job.
-   * @param optons distcp options
+   * @param options distcp options
    * @return the job. It will have already completed.
    * @throws Exception failure
    */
@@ -586,4 +586,68 @@ public abstract class AbstractContractDistCpTest
   private static void mkdirs(FileSystem fs, Path dir) throws Exception {
     assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir));
   }
-}
+
+  @Test
+  public void testDirectWrite() throws Exception {
+    describe("copy file from local to remote using direct write option");
+    directWrite(localFS, localDir, remoteFS, remoteDir, true);
+  }
+
+  @Test
+  public void testNonDirectWrite() throws Exception {
+    describe("copy file from local to remote without using direct write " +
+        "option");
+    directWrite(localFS, localDir, remoteFS, remoteDir, false);
+  }
+
+  /**
+   * Executes a test with support for using direct write option.
+   *
+   * @param srcFS source FileSystem
+   * @param srcDir source directory
+   * @param dstFS destination FileSystem
+   * @param dstDir destination directory
+   * @param directWrite whether to use -directwrite option
+   * @throws Exception if there is a failure
+   */
+  private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
+          Path dstDir, boolean directWrite) throws Exception {
+    initPathFields(srcDir, dstDir);
+
+    // Create 2 test files
+    mkdirs(srcFS, inputSubDir1);
+    byte[] data1 = dataset(64, 33, 43);
+    createFile(srcFS, inputFile1, true, data1);
+    byte[] data2 = dataset(200, 43, 53);
+    createFile(srcFS, inputFile2, true, data2);
+    Path target = new Path(dstDir, "outputDir");
+    if (directWrite) {
+      runDistCpDirectWrite(inputDir, target);
+    } else {
+      runDistCp(inputDir, target);
+    }
+    ContractTestUtils.assertIsDirectory(dstFS, target);
+    lsR("Destination tree after distcp", dstFS, target);
+
+    // Verify copied file contents
+    verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
+    verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"),
+        data2);
+  }
+
+  /**
+   * Run distcp -direct srcDir destDir.
+   * @param srcDir local source directory
+   * @param destDir remote destination directory
+   * @return the completed job
+   * @throws Exception any failure.
+   */
+  private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
+          throws Exception {
+    describe("\nDistcp -direct from " + srcDir + " to " + destDir);
+    return runDistCp(buildWithStandardOptions(
+            new DistCpOptions.Builder(
+                    Collections.singletonList(srcDir), destDir)
+                    .withDirectWrite(true)));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org