You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/08/22 17:48:38 UTC

[hadoop] branch trunk updated: HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 872c290  HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)
872c290 is described below

commit 872c2909bdc636ec2c7da3f94b9e07348e3a6f0f
Author: swamirishi <47...@users.noreply.github.com>
AuthorDate: Sat Aug 22 23:18:21 2020 +0530

    HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)
    
    
    Contributed by Swaminathan Balachandran
---
 .../apache/hadoop/tools/mapred/CopyCommitter.java  |  6 +-
 .../hadoop/tools/mapred/TestCopyCommitter.java     | 69 +++++++++++++++++-----
 2 files changed, 59 insertions(+), 16 deletions(-)

diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 139bd08..e346d0b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -318,8 +318,10 @@ public class CopyCommitter extends FileOutputCommitter {
     SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
                                       SequenceFile.Reader.file(sourceListing));
     long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
-
-    Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    // For Atomic Copy the Final & Work Path are different & atomic copy has
+    // already moved it to final path.
+    Path targetRoot =
+            new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
 
     long preservedEntries = 0;
     try {
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 11118c1f..685f030 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -53,6 +53,8 @@ import java.io.IOException;
 import java.util.*;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH;
+import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH;
 import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
 
 public class TestCopyCommitter {
@@ -160,10 +162,10 @@ public class TestCopyCommitter {
       context.setTargetPathExists(false);
 
       CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
-      Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+      Path listingFile = new Path("/tmp1/" + rand.nextLong());
       listing.buildListing(listingFile, context);
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       committer.commitJob(jobContext);
       checkDirectoryPermissions(fs, targetBase, sourcePerm);
@@ -180,6 +182,45 @@ public class TestCopyCommitter {
   }
 
   @Test
+  public void testPreserveStatusWithAtomicCommit() throws IOException {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContextImpl(
+                            taskAttemptContext.getConfiguration(),
+                            taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+    String sourceBase;
+    String workBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      FsPermission sourcePerm = new FsPermission((short) 511);
+      FsPermission initialPerm = new FsPermission((short) 448);
+      sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
+      workBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
+      targetBase = "/tmp1/" + rand.nextLong();
+      final DistCpOptions options = new DistCpOptions.Builder(
+              Collections.singletonList(new Path(sourceBase)), new Path("/out"))
+              .preserve(FileAttribute.PERMISSION).build();
+      options.appendToConf(conf);
+      final DistCpContext context = new DistCpContext(options);
+      context.setTargetPathExists(false);
+      CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+      Path listingFile = new Path("/tmp1/" + rand.nextLong());
+      listing.buildListing(listingFile, context);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase);
+      conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+      committer.commitJob(jobContext);
+      checkDirectoryPermissions(fs, targetBase, sourcePerm);
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    }
+  }
+
+  @Test
   public void testDeleteMissing() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
@@ -207,8 +248,8 @@ public class TestCopyCommitter {
       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
       listing.buildListing(listingFile, context);
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       committer.commitJob(jobContext);
       verifyFoldersAreInSync(fs, targetBase, sourceBase);
@@ -256,8 +297,8 @@ public class TestCopyCommitter {
       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
       listing.buildListing(listingFile, context);
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       Path sourceListing = new Path(
               conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
@@ -320,8 +361,8 @@ public class TestCopyCommitter {
       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
       listing.buildListing(listingFile, context);
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       committer.commitJob(jobContext);
       verifyFoldersAreInSync(fs, targetBase, sourceBase);
@@ -353,8 +394,8 @@ public class TestCopyCommitter {
       fs = FileSystem.get(conf);
       fs.mkdirs(new Path(workPath));
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
 
       assertPathExists(fs, "Work path", new Path(workPath));
@@ -391,8 +432,8 @@ public class TestCopyCommitter {
       fs.mkdirs(new Path(workPath));
       fs.mkdirs(new Path(finalPath));
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
 
       assertPathExists(fs, "Work path", new Path(workPath));
@@ -463,8 +504,8 @@ public class TestCopyCommitter {
           + String.valueOf(rand.nextLong()));
       listing.buildListing(listingFile, context);
 
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
-      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       OutputCommitter committer = new CopyCommitter(
           null, taskAttemptContext);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org