You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/03 22:11:16 UTC
svn commit: r1356907 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/ja...
Author: bobby
Date: Tue Jul 3 20:11:15 2012
New Revision: 1356907
URL: http://svn.apache.org/viewvc?rev=1356907&view=rev
Log:
svn merge -c 1348997 FIXES: MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Jul 3 20:11:15 2012
@@ -44,6 +44,10 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4267. mavenize pipes (tgraves via bobby)
+ MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
+ (tomwhite)
+
+
OPTIMIZATIONS
MAPREDUCE-3850. Avoid redundant calls for tokens in TokenCache (Daryn
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Tue Jul 3 20:11:15 2012
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -45,6 +46,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
@@ -72,6 +74,8 @@ class LocalDistributedCacheManager {
private List<String> localFiles = new ArrayList<String>();
private List<String> localClasspaths = new ArrayList<String>();
+ private List<File> symlinksCreated = new ArrayList<File>();
+
private boolean setupCalled = false;
/**
@@ -172,18 +176,51 @@ class LocalDistributedCacheManager {
.size()])));
}
if (DistributedCache.getSymlink(conf)) {
- // This is not supported largely because,
- // for a Child subprocess, the cwd in LocalJobRunner
- // is not a fresh slate, but rather the user's working directory.
- // This is further complicated because the logic in
- // setupWorkDir only creates symlinks if there's a jarfile
- // in the configuration.
- LOG.warn("LocalJobRunner does not support " +
- "symlinking into current working dir.");
+ File workDir = new File(System.getProperty("user.dir"));
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+ if (archives != null) {
+ for (int i = 0; i < archives.length; i++) {
+ String link = archives[i].getFragment();
+ String target = new File(localArchives[i].toUri()).getPath();
+ symlink(workDir, target, link);
+ }
+ }
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ String link = files[i].getFragment();
+ String target = new File(localFiles[i].toUri()).getPath();
+ symlink(workDir, target, link);
+ }
+ }
}
setupCalled = true;
}
+ /**
+ * Utility method for creating a symlink and warning on errors.
+ *
+ * If link is null, does nothing.
+ */
+ private void symlink(File workDir, String target, String link)
+ throws IOException {
+ if (link != null) {
+ link = workDir.toString() + Path.SEPARATOR + link;
+ File flink = new File(link);
+ if (!flink.exists()) {
+ LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+ if (0 != FileUtil.symLink(target, link)) {
+ LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+ link));
+ } else {
+ symlinksCreated.add(new File(link));
+ }
+ }
+ }
+ }
+
/**
* Are the resources that should be added to the classpath?
* Should be called after setup().
@@ -217,6 +254,12 @@ class LocalDistributedCacheManager {
}
public void close() throws IOException {
+ for (File symlink : symlinksCreated) {
+ if (!symlink.delete()) {
+ LOG.warn("Failed to delete symlink created by the local job runner: " +
+ symlink);
+ }
+ }
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
for (String archive : localArchives) {
localFSFileContext.delete(new Path(archive), true);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1356907&r1=1356906&r2=1356907&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Jul 3 20:11:15 2012
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@@ -61,6 +62,9 @@ import org.apache.hadoop.mapreduce.serve
public class TestMRWithDistributedCache extends TestCase {
private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
+ private static File symlinkFile = new File("distributed.first.symlink");
+ private static File expectedAbsentSymlinkFile =
+ new File("distributed.second.jar");
private static Configuration conf = new Configuration();
private static FileSystem localFs;
static {
@@ -107,20 +111,17 @@ public class TestMRWithDistributedCache
TestCase.assertNotNull(cl.getResource("distributed.jar.inside3"));
TestCase.assertNull(cl.getResource("distributed.jar.inside4"));
-
// Check that the symlink for the renaming was created in the cwd;
- // This only happens for real for non-local jobtrackers.
- // (The symlinks exist in "localRunner/" for local Jobtrackers,
- // but the user has no way to get at them.
- if (!"local".equals(
- context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) {
- File symlinkFile = new File("distributed.first.symlink");
- TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists());
- TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length());
- }
+ TestCase.assertTrue("symlink distributed.first.symlink doesn't exist",
+ symlinkFile.exists());
+ TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
+ symlinkFile.length());
+
+ TestCase.assertFalse("second file should not be symlinked",
+ expectedAbsentSymlinkFile.exists());
}
}
-
+
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
@@ -144,11 +145,7 @@ public class TestMRWithDistributedCache
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
-
- // don't create symlink for LocalJobRunner
- if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) {
- job.createSymlink();
- }
+ job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures
job.submit();
@@ -157,10 +154,17 @@ public class TestMRWithDistributedCache
/** Tests using the local job runner. */
public void testLocalJobRunner() throws Exception {
+ symlinkFile.delete(); // ensure symlink is not present (e.g. if test is
+ // killed part way through)
+
Configuration c = new Configuration();
c.set(JTConfig.JT_IPC_ADDRESS, "local");
c.set("fs.defaultFS", "file:///");
testWithConf(c);
+
+ assertFalse("Symlink not removed by local job runner",
+ // Symlink target will have gone so can't use File.exists()
+ Arrays.asList(new File(".").list()).contains(symlinkFile.getName()));
}
private Path createTempFile(String filename, String contents)