You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/10/18 13:26:04 UTC

[hadoop] branch branch-3.1 updated: MAPREDUCE-6441. Improve temporary directory name generation in LocalDistributedCacheManager for concurrent processes (wattsinabox, rchiang, haibochen via snemeth)

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 19755b9  MAPREDUCE-6441. Improve temporary directory name generation in LocalDistributedCacheManager for concurrent processes (wattsinabox, rchiang, haibochen via snemeth)
19755b9 is described below

commit 19755b9b369b57362998a1c0aed72dc5e2e18466
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Oct 18 15:25:50 2019 +0200

    MAPREDUCE-6441. Improve temporary directory name generation in LocalDistributedCacheManager for concurrent processes (wattsinabox, rchiang, haibochen via snemeth)
---
 .../mapred/LocalDistributedCacheManager.java       | 13 ++-
 .../org/apache/hadoop/mapred/LocalJobRunner.java   |  2 +-
 .../mapred/TestLocalDistributedCacheManager.java   | 95 ++++++++++++++++------
 3 files changed, 78 insertions(+), 32 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 2a14ec3..bcf73d1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.UUID;
 
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
@@ -82,7 +82,7 @@ class LocalDistributedCacheManager {
    * @param conf
    * @throws IOException
    */
-  public void setup(JobConf conf) throws IOException {
+  public void setup(JobConf conf, JobID jobId) throws IOException {
     File workDir = new File(System.getProperty("user.dir"));
     
     // Generate YARN local resources objects corresponding to the distributed
@@ -91,9 +91,7 @@ class LocalDistributedCacheManager {
       new LinkedHashMap<String, LocalResource>();
     MRApps.setupDistributedCache(conf, localResources);
     // Generating unique numbers for FSDownload.
-    AtomicLong uniqueNumberGenerator =
-        new AtomicLong(System.currentTimeMillis());
-    
+
     // Find which resources are to be put on the local classpath
     Map<String, Path> classpaths = new HashMap<String, Path>();
     Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
@@ -124,9 +122,10 @@ class LocalDistributedCacheManager {
       Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
       Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
       for (LocalResource resource : localResources.values()) {
+        Path destPathForDownload = new Path(destPath,
+            jobId.toString() + "_" + UUID.randomUUID().toString());
         Callable<Path> download =
-            new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
-                Long.toString(uniqueNumberGenerator.incrementAndGet())),
+            new FSDownload(localFSFileContext, ugi, conf, destPathForDownload,
                 resource);
         Future<Path> future = exec.submit(download);
         resourcesToPaths.put(resource, future);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 5e7a250..2ab4e76 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -169,7 +169,7 @@ public class LocalJobRunner implements ClientProtocol {
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
       localDistributedCacheManager = new LocalDistributedCacheManager();
-      localDistributedCacheManager.setup(conf);
+      localDistributedCacheManager.setup(conf, jobid);
       
       // Write out configuration file.  Instead of copying it from
       // systemJobFile, we re-write it, since setup(), above, may have
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
index d2814e9..fa60e2d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
@@ -32,6 +32,13 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -54,37 +61,37 @@ import org.mockito.stubbing.Answer;
 @SuppressWarnings("deprecation")
 public class TestLocalDistributedCacheManager {
 
-  private static FileSystem mockfs; 
-  
+  private static FileSystem mockfs;
+
   public static class MockFileSystem extends FilterFileSystem {
     public MockFileSystem() {
       super(mockfs);
     }
   }
-  
+
   private File localDir;
-  
+
   private static void delete(File file) throws IOException {
-    if (file.getAbsolutePath().length() < 5) { 
+    if (file.getAbsolutePath().length() < 5) {
       throw new IllegalArgumentException(
           "Path [" + file + "] is too short, not deleting");
     }
-    if (file.exists()) {  
+    if (file.exists()) {
       if (file.isDirectory()) {
         File[] children = file.listFiles();
         if (children != null) {
           for (File child : children) {
             delete(child);
-          } 
-        } 
-      } 
+          }
+        }
+      }
       if (!file.delete()) {
         throw new RuntimeException(
           "Could not delete path [" + file + "]");
       }
     }
   }
-  
+
   @Before
   public void setup() throws Exception {
     mockfs = mock(FileSystem.class);
@@ -93,7 +100,7 @@ public class TestLocalDistributedCacheManager {
     delete(localDir);
     localDir.mkdirs();
   }
-  
+
   @After
   public void cleanup() throws Exception {
     delete(localDir);
@@ -120,9 +127,10 @@ public class TestLocalDistributedCacheManager {
 
   @Test
   public void testDownload() throws Exception {
+    JobID jobId = new JobID();
     JobConf conf = new JobConf();
     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
-    
+
     URI mockBase = new URI("mock://test-nn1/");
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
@@ -137,14 +145,14 @@ public class TestLocalDistributedCacheManager {
     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
     final Path filePath = new Path(file);
     File link = new File("link");
-    
+
     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
       @Override
       public FileStatus answer(InvocationOnMock args) throws Throwable {
         Path p = (Path)args.getArguments()[0];
         if("file.txt".equals(p.getName())) {
-         return new FileStatus(201, false, 1, 500, 101, 101, 
-             FsPermission.getDefault(), "me", "me", filePath);
+          return new FileStatus(201, false, 1, 500, 101, 101,
+              FsPermission.getDefault(), "me", "me", filePath);
         }  else {
           throw new FileNotFoundException(p+" not supported by mocking");
         }
@@ -176,7 +184,7 @@ public class TestLocalDistributedCacheManager {
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
     try {
-      manager.setup(conf);
+      manager.setup(conf, jobId);
       assertTrue(link.exists());
     } finally {
       manager.close();
@@ -186,9 +194,10 @@ public class TestLocalDistributedCacheManager {
 
   @Test
   public void testEmptyDownload() throws Exception {
+    JobID jobId = new JobID();
     JobConf conf = new JobConf();
     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
-    
+
     URI mockBase = new URI("mock://test-nn1/");
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
@@ -199,7 +208,7 @@ public class TestLocalDistributedCacheManager {
         return (Path) args.getArguments()[0];
       }
     });
-    
+
     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
       @Override
       public FileStatus answer(InvocationOnMock args) throws Throwable {
@@ -221,7 +230,7 @@ public class TestLocalDistributedCacheManager {
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
     try {
-      manager.setup(conf);
+      manager.setup(conf, jobId);
     } finally {
       manager.close();
     }
@@ -230,9 +239,10 @@ public class TestLocalDistributedCacheManager {
 
   @Test
   public void testDuplicateDownload() throws Exception {
+    JobID jobId = new JobID();
     JobConf conf = new JobConf();
     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
-    
+
     URI mockBase = new URI("mock://test-nn1/");
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
@@ -247,14 +257,14 @@ public class TestLocalDistributedCacheManager {
     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
     final Path filePath = new Path(file);
     File link = new File("link");
-    
+
     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
       @Override
       public FileStatus answer(InvocationOnMock args) throws Throwable {
         Path p = (Path)args.getArguments()[0];
         if("file.txt".equals(p.getName())) {
-         return new FileStatus(201, false, 1, 500, 101, 101, 
-             FsPermission.getDefault(), "me", "me", filePath);
+          return new FileStatus(201, false, 1, 500, 101, 101,
+              FsPermission.getDefault(), "me", "me", filePath);
         }  else {
           throw new FileNotFoundException(p+" not supported by mocking");
         }
@@ -287,11 +297,48 @@ public class TestLocalDistributedCacheManager {
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
     try {
-      manager.setup(conf);
+      manager.setup(conf, jobId);
       assertTrue(link.exists());
     } finally {
       manager.close();
     }
     assertFalse(link.exists());
   }
+
+  /**
+   * This test tries to replicate the issue with the previous version of
+   * {@ref LocalDistributedCacheManager} when the resulting timestamp is
+   * identical as that in another process.  Unfortunately, it is difficult
+   * to mimic such behavior in a single process unit test.  And mocking
+   * the unique id (timestamp previously, UUID otherwise) won't prove the
+   * validity of one approach over the other.
+   */
+  @Test
+  public void testMultipleCacheSetup() throws Exception {
+    JobID jobId = new JobID();
+    JobConf conf = new JobConf();
+    LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
+
+    final int threadCount = 10;
+    final CyclicBarrier barrier = new CyclicBarrier(threadCount);
+
+    ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
+    for (int i = 0; i < threadCount; ++i) {
+      setupCallable.add(() -> {
+        barrier.await();
+        manager.setup(conf, jobId);
+        return null;
+      });
+    }
+
+    ExecutorService ePool = Executors.newFixedThreadPool(threadCount);
+    try {
+      for (Future<Void> future : ePool.invokeAll(setupCallable)) {
+        future.get();
+      }
+    } finally {
+      ePool.shutdown();
+      manager.close();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org