You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/08/22 17:48:38 UTC
[hadoop] branch trunk updated: HADOOP-17122: Preserving Directory
Attributes in DistCp with Atomic Copy (#2133)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 872c290 HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)
872c290 is described below
commit 872c2909bdc636ec2c7da3f94b9e07348e3a6f0f
Author: swamirishi <47...@users.noreply.github.com>
AuthorDate: Sat Aug 22 23:18:21 2020 +0530
HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)
Contributed by Swaminathan Balachandran
---
.../apache/hadoop/tools/mapred/CopyCommitter.java | 6 +-
.../hadoop/tools/mapred/TestCopyCommitter.java | 69 +++++++++++++++++-----
2 files changed, 59 insertions(+), 16 deletions(-)
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 139bd08..e346d0b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -318,8 +318,10 @@ public class CopyCommitter extends FileOutputCommitter {
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sourceListing));
long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
-
- Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ // For Atomic Copy the Final & Work Path are different & atomic copy has
+ // already moved it to final path.
+ Path targetRoot =
+ new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
long preservedEntries = 0;
try {
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 11118c1f..685f030 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -53,6 +53,8 @@ import java.io.IOException;
import java.util.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH;
+import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH;
import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
public class TestCopyCommitter {
@@ -160,10 +162,10 @@ public class TestCopyCommitter {
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
- Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ Path listingFile = new Path("/tmp1/" + rand.nextLong());
listing.buildListing(listingFile, context);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
@@ -180,6 +182,45 @@ public class TestCopyCommitter {
}
@Test
+ public void testPreserveStatusWithAtomicCommit() throws IOException {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(
+ taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+ String sourceBase;
+ String workBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ FsPermission sourcePerm = new FsPermission((short) 511);
+ FsPermission initialPerm = new FsPermission((short) 448);
+ sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
+ workBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
+ targetBase = "/tmp1/" + rand.nextLong();
+ final DistCpOptions options = new DistCpOptions.Builder(
+ Collections.singletonList(new Path(sourceBase)), new Path("/out"))
+ .preserve(FileAttribute.PERMISSION).build();
+ options.appendToConf(conf);
+ final DistCpContext context = new DistCpContext(options);
+ context.setTargetPathExists(false);
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + rand.nextLong());
+ listing.buildListing(listingFile, context);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase);
+ conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+ committer.commitJob(jobContext);
+ checkDirectoryPermissions(fs, targetBase, sourcePerm);
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+ }
+ }
+
+ @Test
public void testDeleteMissing() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
@@ -207,8 +248,8 @@ public class TestCopyCommitter {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
@@ -256,8 +297,8 @@ public class TestCopyCommitter {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
Path sourceListing = new Path(
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
@@ -320,8 +361,8 @@ public class TestCopyCommitter {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
@@ -353,8 +394,8 @@ public class TestCopyCommitter {
fs = FileSystem.get(conf);
fs.mkdirs(new Path(workPath));
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
assertPathExists(fs, "Work path", new Path(workPath));
@@ -391,8 +432,8 @@ public class TestCopyCommitter {
fs.mkdirs(new Path(workPath));
fs.mkdirs(new Path(finalPath));
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
assertPathExists(fs, "Work path", new Path(workPath));
@@ -463,8 +504,8 @@ public class TestCopyCommitter {
+ String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
- conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
OutputCommitter committer = new CopyCommitter(
null, taskAttemptContext);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org