You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/02/07 23:19:08 UTC
[hadoop] branch branch-3.1 updated: HADOOP-15281. Distcp to add
no-rename copy option. Contributed by Andrew Olson.
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 49d5463 HADOOP-15281. Distcp to add no-rename copy option. Contributed by Andrew Olson.
49d5463 is described below
commit 49d54633e0f4bd388c00d591e90666dbb7633c9f
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Thu Feb 7 23:15:18 2019 +0000
HADOOP-15281. Distcp to add no-rename copy option.
Contributed by Andrew Olson.
---
.../fs/contract/s3a/ITestS3AContractDistCp.java | 33 +++++++++++
.../org/apache/hadoop/tools/DistCpConstants.java | 3 +-
.../org/apache/hadoop/tools/DistCpContext.java | 4 ++
.../apache/hadoop/tools/DistCpOptionSwitch.java | 14 ++++-
.../org/apache/hadoop/tools/DistCpOptions.java | 19 ++++++
.../org/apache/hadoop/tools/OptionsParser.java | 4 +-
.../org/apache/hadoop/tools/mapred/CopyMapper.java | 6 +-
.../tools/mapred/RetriableFileCopyCommand.java | 53 ++++++++++++-----
.../hadoop-distcp/src/site/markdown/DistCp.md.vm | 6 +-
.../org/apache/hadoop/tools/TestDistCpOptions.java | 5 +-
.../tools/contract/AbstractContractDistCpTest.java | 68 +++++++++++++++++++++-
11 files changed, 192 insertions(+), 23 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index b3d511e..740f256 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.contract.s3a;
+import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -26,6 +27,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
@@ -74,4 +76,35 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
Path path = super.path(filepath);
return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
}
+
+ @Override
+ public void testDirectWrite() throws Exception {
+ resetStorageStatistics();
+ super.testDirectWrite();
+ assertEquals("Expected no renames for a direct write distcp", 0L,
+ getRenameOperationCount());
+ }
+
+ @Override
+ public void testNonDirectWrite() throws Exception {
+ resetStorageStatistics();
+ try {
+ super.testNonDirectWrite();
+ } catch (FileNotFoundException e) {
+ // We may get this exception when data is written to a DELAY_LISTING_ME
+ // directory causing verification of the distcp success to fail if
+ // S3Guard is not enabled
+ }
+ assertEquals("Expected 2 renames for a non-direct write distcp", 2L,
+ getRenameOperationCount());
+ }
+
+ private void resetStorageStatistics() {
+ getFileSystem().getStorageStatistics().reset();
+ }
+
+ private long getRenameOperationCount() {
+ return getFileSystem().getStorageStatistics()
+ .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
+ }
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 4946091..e20f206 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -85,7 +85,8 @@ public final class DistCpConstants {
"distcp.dynamic.min.records_per_chunk";
public static final String CONF_LABEL_SPLIT_RATIO =
"distcp.dynamic.split.ratio";
-
+ public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write";
+
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
index fc047ca..1e63d80 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
@@ -179,6 +179,10 @@ public class DistCpContext {
return options.getCopyBufferSize();
}
+ public boolean shouldDirectWrite() {
+ return options.shouldDirectWrite();
+ }
+
public void setTargetPathExists(boolean targetPathExists) {
this.targetPathExists = targetPathExists;
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index e57e413..49ffc59 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -223,7 +223,19 @@ public enum DistCpOptionSwitch {
*/
FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE,
new Option("filters", true, "The path to a file containing a list of"
- + " strings for paths to be excluded from the copy."));
+ + " strings for paths to be excluded from the copy.")),
+
+ /**
+ * Write directly to the final location, avoiding the creation and rename
+ * of temporary files.
+ * This is typically useful in cases where the target filesystem
+ * implementation does not support atomic rename operations, such as with
+ * the S3AFileSystem which translates file renames to potentially very
+ * expensive copy-then-delete operations.
+ */
+ DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
+ new Option("direct", false, "Write files directly to the"
+ + " target location, avoiding temporary file rename."));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index cff04eb..f5a72bf 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -155,6 +155,9 @@ public final class DistCpOptions {
private final int copyBufferSize;
+ /** Whether data should be written directly to the target paths. */
+ private final boolean directWrite;
+
/**
* File attributes for preserve.
*
@@ -216,6 +219,8 @@ public final class DistCpOptions {
this.copyBufferSize = builder.copyBufferSize;
this.verboseLog = builder.verboseLog;
this.trackPath = builder.trackPath;
+
+ this.directWrite = builder.directWrite;
}
public Path getSourceFileListing() {
@@ -343,6 +348,10 @@ public final class DistCpOptions {
return trackPath;
}
+ public boolean shouldDirectWrite() {
+ return directWrite;
+ }
+
/**
* Add options to configuration. These will be used in the Mapper/committer
*
@@ -391,6 +400,8 @@ public final class DistCpOptions {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
Integer.toString(numListstatusThreads));
}
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
+ String.valueOf(directWrite));
}
/**
@@ -427,6 +438,7 @@ public final class DistCpOptions {
", blocksPerChunk=" + blocksPerChunk +
", copyBufferSize=" + copyBufferSize +
", verboseLog=" + verboseLog +
+ ", directWrite=" + directWrite +
'}';
}
@@ -476,6 +488,8 @@ public final class DistCpOptions {
private int copyBufferSize =
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
+ private boolean directWrite = false;
+
public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!");
@@ -728,6 +742,11 @@ public final class DistCpOptions {
this.verboseLog = newVerboseLog;
return this;
}
+
+ public Builder withDirectWrite(boolean newDirectWrite) {
+ this.directWrite = newDirectWrite;
+ return this;
+ }
}
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 668b594..ef0017b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -113,7 +113,9 @@ public class OptionsParser {
.withBlocking(
!command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch()))
.withVerboseLog(
- command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()));
+ command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
+ .withDirectWrite(
+ command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command,
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index faa4aa2..6b0f1f1 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -84,6 +84,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
private boolean overWrite = false;
private boolean append = false;
private boolean verboseLog = false;
+ private boolean directWrite = false;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null;
@@ -111,6 +112,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
+ directWrite = conf.getBoolean(
+ DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
@@ -253,7 +256,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
- action).execute(sourceFileStatus, target, context, fileAttributes);
+ action, directWrite).execute(sourceFileStatus, target, context,
+ fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 55f90d0..25c1de1 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private boolean skipCrc = false;
+ private boolean directWrite = false;
private FileAction action;
/**
@@ -80,6 +81,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
/**
+ * Create a RetriableFileCopyCommand.
+ *
+ * @param skipCrc Whether to skip the crc check.
+ * @param description A verbose description of the copy operation.
+ * @param action We should overwrite the target file or append new data to it.
+ * @param directWrite Whether to write directly to the target path, avoiding a
+ * temporary file rename.
+ */
+ public RetriableFileCopyCommand(boolean skipCrc, String description,
+ FileAction action, boolean directWrite) {
+ this(skipCrc, description, action);
+ this.directWrite = directWrite;
+ }
+
+ /**
* Implementation of RetriableCommand::doExecute().
* This is the actual copy-implementation.
* @param arguments Argument-list to the command.
@@ -102,16 +118,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private long doCopy(CopyListingFileStatus source, Path target,
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
throws IOException {
+ LOG.info("Copying " + source.getPath() + " to " + target);
+
final boolean toAppend = action == FileAction.APPEND;
- Path targetPath = toAppend ? target : getTmpFile(target, context);
+ final boolean useTempTarget = !toAppend && !directWrite;
+ Path targetPath = useTempTarget ? getTempFile(target, context) : target;
+
+ LOG.info("Writing to "
+ + (useTempTarget ? "temporary" : "direct")
+ + " target file path " + targetPath);
+
final Configuration configuration = context.getConfiguration();
FileSystem targetFS = target.getFileSystem(configuration);
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying " + source.getPath() + " to " + target);
- LOG.debug("Target file path: " + targetPath);
- }
final Path sourcePath = source.getPath();
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
final FileChecksum sourceChecksum = fileAttributes
@@ -134,17 +154,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
targetFS, targetPath);
}
}
- // it's not append case, thus we first write to a temporary file, rename
- // it to the target path.
- if (!toAppend) {
+ // it's not append or direct write (preferred for s3a) case, thus we first
+ // write to a temporary file, then rename it to the target path.
+ if (useTempTarget) {
+ LOG.info("Renaming temporary target file path " + targetPath
+ + " to " + target);
promoteTmpToTarget(targetPath, target, targetFS);
}
+ LOG.info("Completed writing " + target + " (" + bytesRead + " bytes)");
return bytesRead;
} finally {
// note that for append case, it is possible that we append partial data
// and then fail. In that case, for the next retry, we either reuse the
// partial appended data if it is good or we overwrite the whole file
- if (!toAppend) {
+ if (useTempTarget) {
targetFS.delete(targetPath, false);
}
}
@@ -252,14 +275,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
}
- private Path getTmpFile(Path target, Mapper.Context context) {
+ private Path getTempFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
- Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
- LOG.info("Creating temp file: " +
- new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
- return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
+ Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
+ : targetWorkPath;
+ Path tempFile = new Path(root, ".distcp.tmp." +
+ context.getTaskAttemptID().toString());
+ LOG.info("Creating temp file: " + tempFile);
+ return tempFile;
}
@VisibleForTesting
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index b855422..25ea7e2 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -241,6 +241,7 @@ Flag | Description | Notes
`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements [...]
`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
`-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option.
+`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store
Architecture of DistCp
----------------------
@@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts.
DistCp can be used to upload data
```bash
-hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
+hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
```
To download data
@@ -535,6 +536,9 @@ rely on disk buffering.
Copies each byte down to the Hadoop worker nodes and back to the
bucket. As well as being slow, it means that charges may be incurred.
+* The `-direct` option can be used to write to object store target paths directly,
+avoiding the potentially very expensive temporary file rename operations that would
+otherwise occur.
Frequently Asked Questions
--------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index 62a2e6d..7382795 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -287,8 +287,9 @@ public class TestDistCpOptions {
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
- "sourcePaths=null, targetPath=xyz, filtersFile='null'," +
- " blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}";
+ "sourcePaths=null, targetPath=xyz, filtersFile='null', " +
+ "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
+ "directWrite=false}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index 1458991..c8a1d7e 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -552,7 +552,7 @@ public abstract class AbstractContractDistCpTest
/**
* Run the distcp job.
- * @param optons distcp options
+ * @param options distcp options
* @return the job. It will have already completed.
* @throws Exception failure
*/
@@ -586,4 +586,68 @@ public abstract class AbstractContractDistCpTest
private static void mkdirs(FileSystem fs, Path dir) throws Exception {
assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir));
}
-}
+
+ @Test
+ public void testDirectWrite() throws Exception {
+ describe("copy file from local to remote using direct write option");
+ directWrite(localFS, localDir, remoteFS, remoteDir, true);
+ }
+
+ @Test
+ public void testNonDirectWrite() throws Exception {
+ describe("copy file from local to remote without using direct write " +
+ "option");
+ directWrite(localFS, localDir, remoteFS, remoteDir, false);
+ }
+
+ /**
+ * Executes a test with support for using direct write option.
+ *
+ * @param srcFS source FileSystem
+ * @param srcDir source directory
+ * @param dstFS destination FileSystem
+ * @param dstDir destination directory
+ * @param directWrite whether to use -directwrite option
+ * @throws Exception if there is a failure
+ */
+ private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
+ Path dstDir, boolean directWrite) throws Exception {
+ initPathFields(srcDir, dstDir);
+
+ // Create 2 test files
+ mkdirs(srcFS, inputSubDir1);
+ byte[] data1 = dataset(64, 33, 43);
+ createFile(srcFS, inputFile1, true, data1);
+ byte[] data2 = dataset(200, 43, 53);
+ createFile(srcFS, inputFile2, true, data2);
+ Path target = new Path(dstDir, "outputDir");
+ if (directWrite) {
+ runDistCpDirectWrite(inputDir, target);
+ } else {
+ runDistCp(inputDir, target);
+ }
+ ContractTestUtils.assertIsDirectory(dstFS, target);
+ lsR("Destination tree after distcp", dstFS, target);
+
+ // Verify copied file contents
+ verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
+ verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"),
+ data2);
+ }
+
+ /**
+ * Run distcp -direct srcDir destDir.
+ * @param srcDir local source directory
+ * @param destDir remote destination directory
+ * @return the completed job
+ * @throws Exception any failure.
+ */
+ private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
+ throws Exception {
+ describe("\nDistcp -direct from " + srcDir + " to " + destDir);
+ return runDistCp(buildWithStandardOptions(
+ new DistCpOptions.Builder(
+ Collections.singletonList(srcDir), destDir)
+ .withDirectWrite(true)));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org