You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/08/01 14:07:13 UTC

[hadoop] branch branch-3.2 updated: MAPREDUCE-7225: Fix broken current folder expansion during MR job start. Contributed by Peter Bacsko.

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new e611fb8  MAPREDUCE-7225: Fix broken current folder expansion during MR job start. Contributed by Peter Bacsko.
e611fb8 is described below

commit e611fb878bd34b5083f7988e28a86933b181db2c
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Aug 1 16:05:28 2019 +0200

    MAPREDUCE-7225: Fix broken current folder expansion during MR job start. Contributed by Peter Bacsko.
---
 .../hadoop/mapreduce/JobResourceUploader.java      | 25 ++++++++-
 .../hadoop/mapreduce/TestJobResourceUploader.java  | 64 +++++++++++++++++++++-
 2 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index e106a54..c8686d7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -59,6 +59,8 @@ import com.google.common.annotations.VisibleForTesting;
 class JobResourceUploader {
   protected static final Logger LOG =
       LoggerFactory.getLogger(JobResourceUploader.class);
+  private static final String ROOT_PATH = "/";
+
   private final boolean useWildcard;
   private final FileSystem jtFs;
   private SharedCacheClient scClient = null;
@@ -674,9 +676,30 @@ class JobResourceUploader {
     if (FileUtil.compareFs(remoteFs, jtFs)) {
       return originalPath;
     }
+
+    boolean root = false;
+    if (ROOT_PATH.equals(originalPath.toUri().getPath())) {
+      // "/" needs special treatment
+      root = true;
+    } else {
+      // If originalPath ends in a "/", then remove it so
+      // that originalPath.getName() does not return an empty string
+      String uriString = originalPath.toUri().toString();
+      if (uriString.endsWith("/")) {
+        try {
+          URI strippedURI =
+              new URI(uriString.substring(0, uriString.length() - 1));
+          originalPath = new Path(strippedURI);
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException("Error processing URI", e);
+        }
+      }
+    }
+
     // this might have name collisions. copy will throw an exception
     // parse the original path to create new path
-    Path newPath = new Path(parentDir, originalPath.getName());
+    Path newPath = root ?
+        parentDir : new Path(parentDir, originalPath.getName());
     FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
     jtFs.setReplication(newPath, replication);
     jtFs.makeQualified(newPath);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
index d347da5..8ab54a6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.never;
@@ -25,9 +26,11 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -46,8 +49,10 @@ import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.verification.VerificationMode;
 
+
 /**
  * A class for unit testing JobResourceUploader.
  */
@@ -375,6 +380,50 @@ public class TestJobResourceUploader {
     testErasureCodingSetting(false);
   }
 
+  @Test
+  public void testOriginalPathEndsInSlash()
+      throws IOException, URISyntaxException {
+    testOriginalPathWithTrailingSlash(
+        new Path(new URI("file:/local/mapred/test/")),
+        new Path("hdfs://localhost:1234/home/hadoop/test/"));
+  }
+
+  @Test
+  public void testOriginalPathIsRoot() throws IOException, URISyntaxException {
+    testOriginalPathWithTrailingSlash(
+        new Path(new URI("file:/")),
+        new Path("hdfs://localhost:1234/home/hadoop/"));
+  }
+
+  private void testOriginalPathWithTrailingSlash(Path path,
+      Path expectedRemotePath) throws IOException, URISyntaxException {
+    Path dstPath = new Path("hdfs://localhost:1234/home/hadoop/");
+    DistributedFileSystem fs = mock(DistributedFileSystem.class);
+    // make sure that FileUtils.copy() doesn't try to copy anything
+    when(fs.mkdirs(any(Path.class))).thenReturn(false);
+    when(fs.getUri()).thenReturn(dstPath.toUri());
+
+    JobResourceUploader uploader = new StubedUploader(fs, true, true);
+    JobConf jConf = new JobConf();
+    Path originalPath = spy(path);
+    FileSystem localFs = mock(FileSystem.class);
+    FileStatus fileStatus = mock(FileStatus.class);
+    when(localFs.getFileStatus(any(Path.class))).thenReturn(fileStatus);
+    when(fileStatus.isDirectory()).thenReturn(true);
+    when(fileStatus.getPath()).thenReturn(originalPath);
+
+    doReturn(localFs).when(originalPath)
+      .getFileSystem(any(Configuration.class));
+    when(localFs.getUri()).thenReturn(path.toUri());
+
+    uploader.copyRemoteFiles(dstPath,
+        originalPath, jConf, (short) 1);
+
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    verify(fs).makeQualified(pathCaptor.capture());
+    Assert.assertEquals("Path", expectedRemotePath, pathCaptor.getValue());
+  }
+
   private void testErasureCodingSetting(boolean defaultBehavior)
       throws IOException {
     JobConf jConf = new JobConf();
@@ -387,7 +436,7 @@ public class TestJobResourceUploader {
     DistributedFileSystem fs = mock(DistributedFileSystem.class);
     Path path = new Path("/");
     when(fs.makeQualified(any(Path.class))).thenReturn(path);
-    JobResourceUploader uploader = new StubedUploader(fs, true);
+    JobResourceUploader uploader = new StubedUploader(fs, true, false);
     Job job = Job.getInstance(jConf);
 
     uploader.uploadResources(job, new Path("/test"));
@@ -728,6 +777,8 @@ public class TestJobResourceUploader {
   }
 
   private class StubedUploader extends JobResourceUploader {
+    private boolean callOriginalCopy = false;
+
     StubedUploader(JobConf conf) throws IOException {
       this(conf, false);
     }
@@ -736,8 +787,10 @@ public class TestJobResourceUploader {
       super(FileSystem.getLocal(conf), useWildcard);
     }
 
-    StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
+    StubedUploader(FileSystem fs, boolean useWildcard,
+        boolean callOriginalCopy) throws IOException {
       super(fs, useWildcard);
+      this.callOriginalCopy = callOriginalCopy;
     }
 
     @Override
@@ -757,7 +810,12 @@ public class TestJobResourceUploader {
     @Override
     Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf,
         short replication) throws IOException {
-      return new Path(destinationPathPrefix + originalPath.getName());
+      if (callOriginalCopy) {
+        return super.copyRemoteFiles(
+            parentDir, originalPath, conf, replication);
+      } else {
+        return new Path(destinationPathPrefix + originalPath.getName());
+      }
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org