You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/03 22:11:16 UTC

svn commit: r1356907 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/ja...

Author: bobby
Date: Tue Jul  3 20:11:15 2012
New Revision: 1356907

URL: http://svn.apache.org/viewvc?rev=1356907&view=rev
Log:
svn merge -c 1348997 FIXES: MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Jul  3 20:11:15 2012
@@ -44,6 +44,10 @@ Release 0.23.3 - UNRELEASED
 
     MAPREDUCE-4267. mavenize pipes (tgraves via bobby)
 
+    MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
+    (tomwhite)
+
+
   OPTIMIZATIONS
 
     MAPREDUCE-3850. Avoid redundant calls for tokens in TokenCache (Daryn

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Tue Jul  3 20:11:15 2012
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -45,6 +46,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -72,6 +74,8 @@ class LocalDistributedCacheManager {
   private List<String> localFiles = new ArrayList<String>();
   private List<String> localClasspaths = new ArrayList<String>();
   
+  private List<File> symlinksCreated = new ArrayList<File>();
+  
   private boolean setupCalled = false;
   
   /**
@@ -172,18 +176,51 @@ class LocalDistributedCacheManager {
               .size()])));
     }
     if (DistributedCache.getSymlink(conf)) {
-      // This is not supported largely because, 
-      // for a Child subprocess, the cwd in LocalJobRunner
-      // is not a fresh slate, but rather the user's working directory.
-      // This is further complicated because the logic in
-      // setupWorkDir only creates symlinks if there's a jarfile
-      // in the configuration.
-      LOG.warn("LocalJobRunner does not support " +
-          "symlinking into current working dir.");
+      File workDir = new File(System.getProperty("user.dir"));
+      URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+      Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+      if (archives != null) {
+        for (int i = 0; i < archives.length; i++) {
+          String link = archives[i].getFragment();
+          String target = new File(localArchives[i].toUri()).getPath();
+          symlink(workDir, target, link);
+        }
+      }
+      if (files != null) {
+        for (int i = 0; i < files.length; i++) {
+          String link = files[i].getFragment();
+          String target = new File(localFiles[i].toUri()).getPath();
+          symlink(workDir, target, link);
+        }
+      }
     }
     setupCalled = true;
   }
   
+  /**
+   * Utility method for creating a symlink and warning on errors.
+   *
+   * If link is null, does nothing.
+   */
+  private void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+              link));
+        } else {
+          symlinksCreated.add(new File(link));
+        }
+      }
+    }
+  }
+  
   /** 
    * Are the resources that should be added to the classpath? 
    * Should be called after setup().
@@ -217,6 +254,12 @@ class LocalDistributedCacheManager {
   }
 
   public void close() throws IOException {
+    for (File symlink : symlinksCreated) {
+      if (!symlink.delete()) {
+        LOG.warn("Failed to delete symlink created by the local job runner: " +
+            symlink);
+      }
+    }
     FileContext localFSFileContext = FileContext.getLocalFSFileContext();
     for (String archive : localArchives) {
       localFSFileContext.delete(new Path(archive), true);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Jul  3 20:11:15 2012
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
@@ -61,6 +62,9 @@ import org.apache.hadoop.mapreduce.serve
 public class TestMRWithDistributedCache extends TestCase {
   private static Path TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"));
+  private static File symlinkFile = new File("distributed.first.symlink");
+  private static File expectedAbsentSymlinkFile =
+    new File("distributed.second.jar");
   private static Configuration conf = new Configuration();
   private static FileSystem localFs;
   static {
@@ -107,20 +111,17 @@ public class TestMRWithDistributedCache 
       TestCase.assertNotNull(cl.getResource("distributed.jar.inside3"));
       TestCase.assertNull(cl.getResource("distributed.jar.inside4"));
 
-
       // Check that the symlink for the renaming was created in the cwd;
-      // This only happens for real for non-local jobtrackers.
-      // (The symlinks exist in "localRunner/" for local Jobtrackers,
-      // but the user has no way to get at them.
-      if (!"local".equals(
-          context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) {
-        File symlinkFile = new File("distributed.first.symlink");
-        TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists());
-        TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length());
-      }
+      TestCase.assertTrue("symlink distributed.first.symlink doesn't exist",
+          symlinkFile.exists());
+      TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
+          symlinkFile.length());
+      
+      TestCase.assertFalse("second file should not be symlinked",
+          expectedAbsentSymlinkFile.exists());
     }
   }
-
+  
   private void testWithConf(Configuration conf) throws IOException,
       InterruptedException, ClassNotFoundException, URISyntaxException {
     // Create a temporary file of length 1.
@@ -144,11 +145,7 @@ public class TestMRWithDistributedCache 
     job.addFileToClassPath(second);
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
-    
-    // don't create symlink for LocalJobRunner
-    if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) {
-      job.createSymlink();
-    }
+    job.createSymlink();
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();
@@ -157,10 +154,17 @@ public class TestMRWithDistributedCache 
 
   /** Tests using the local job runner. */
   public void testLocalJobRunner() throws Exception {
+    symlinkFile.delete(); // ensure symlink is not present (e.g. if test is
+                          // killed part way through)
+    
     Configuration c = new Configuration();
     c.set(JTConfig.JT_IPC_ADDRESS, "local");
     c.set("fs.defaultFS", "file:///");
     testWithConf(c);
+    
+    assertFalse("Symlink not removed by local job runner",
+            // Symlink target will have gone so can't use File.exists()
+            Arrays.asList(new File(".").list()).contains(symlinkFile.getName()));
   }
 
   private Path createTempFile(String filename, String contents)