You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/09 20:55:52 UTC

svn commit: r1501462 - in /hadoop/common/branches/branch-1-win: CHANGES.branch-1-win.txt src/mapred/mapred-default.xml src/mapred/org/apache/hadoop/mapred/JobClient.java src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java

Author: cnauroth
Date: Tue Jul  9 18:55:51 2013
New Revision: 1501462

URL: http://svn.apache.org/r1501462
Log:
MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the default FS. Contributed by Xi Fang.

Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Tue Jul  9 18:55:51 2013
@@ -298,6 +298,9 @@ Branch-hadoop-1-win (branched from branc
     MAPREDUCE-5371. TestProxyUserFromEnv#testProxyUserFromEnvironment failed
     caused by domains of windows users. (Xi Fang via cnauroth)
 
+    MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the
+    default FS. (Xi Fang via cnauroth)
+
   Merged from branch-1
 
     HDFS-385. Backport: Add support for an experimental API that allows a

Modified: hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml Tue Jul  9 18:55:51 2013
@@ -415,6 +415,15 @@
 </property>
 
 <property>
+  <name>mapreduce.client.accessible.remote.schemes</name>
+  <value></value>
+  <description>The schemes of the file systems that are accessible from 
+  all the nodes in the cluster. Used by the job client to avoid copying 
+  distributed cache entries to the job staging dir if path is accessible.
+  </description>
+</property>
+
+<property>
   <name>mapred.tasktracker.reduce.tasks.maximum</name>
   <value>2</value>
   <description>The maximum number of reduce tasks that will be run

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Jul  9 18:55:51 2013
@@ -47,6 +47,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -450,6 +451,9 @@ public class JobClient extends Configure
   private static final String TASKLOG_PULL_TIMEOUT_KEY = 
     "mapreduce.client.tasklog.timeout";
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
+  @Private
+  public static final String CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY =
+    "mapreduce.client.accessible.remote.schemes";
   static int tasklogtimeout;
 
   public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
@@ -688,18 +692,33 @@ public class JobClient extends Configure
   private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, 
       final Path originalPath, final JobConf job, short replication) 
   throws IOException, InterruptedException {
-    //check if we do not need to copy the files
-    // is jt using the same file system.
-    // just checking for uri strings... doing no dns lookups 
-    // to see if the filesystems are the same. This is not optimal.
-    // but avoids name resolution.
     
     FileSystem remoteFs = null;
     remoteFs = originalPath.getFileSystem(job);
+
+    // Check if we do not need to copy the files
     
+    // Check whether the given path is accessible from all the 
+    // nodes in the cluster in which case we skip the copy operation 
+    // altogether and pass the original path.
+    String remoteFsScheme = remoteFs.getUri().getScheme();
+    String [] accessibleSchemes = job.getStrings(
+        CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY, null);
+    if (accessibleSchemes != null) {
+      for (String s : accessibleSchemes) {
+        if (remoteFsScheme.equalsIgnoreCase(s)) {
+          return originalPath;
+        }
+      }
+    }     
+    // Check whether jt is using the same file system.
+    // just checking for uri strings... doing no dns lookups 
+    // to see if the filesystems are the same. This is not optimal.
+    // but avoids name resolution.
     if (compareFs(remoteFs, jtFs)) {
       return originalPath;
     }
+    
     // this might have name collisions. copy will throw an exception
     //parse the original path to create new path
     Path newPath = new Path(parentDir, originalPath.getName());
@@ -808,7 +827,7 @@ public class JobClient extends Configure
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
         DistributedCache.addArchiveToClassPath
-          (new Path(newPath.toUri().getPath()), job, fs);
+          (new Path(newPath.toUri().getPath()), job, newPath.getFileSystem(job));
       }
     }
     

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java?rev=1501462&r1=1501461&r2=1501462&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java Tue Jul  9 18:55:51 2013
@@ -36,10 +36,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -179,6 +181,96 @@ public class TestMRWithDistributedCache 
 
   }
 
+  /**
+   * Tests setting job tracker's staging dir to a nondefault filesystem. 
+   * It validates that distributed cache entries are not copied to the staging 
+   * dir for schemes defined in "mapreduce.client.accessible.remote.schemes" 
+   * (denoted by JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY).
+   */
+  public void testJTStagingOnNondefaultFS() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr  = null;
+    try {
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(1, fileSys.getUri().toString(), 1);
+      runWithConfJTStagingOnNondefaultFS(mr.createJobConf());
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+  
+  private void runWithConfJTStagingOnNondefaultFS(JobConf conf) 
+      throws IOException, InterruptedException, 
+      ClassNotFoundException, URISyntaxException {
+    // Create a temporary file of length 1.
+    Path first = createTempFile("distributed.first", "x")
+        .makeQualified(localFs);
+    // Create two jars with a single file inside them.
+    Path second =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2)
+            .makeQualified(localFs);
+    Path third =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3)
+            .makeQualified(localFs);
+    // Set configuration properties for this job
+    conf.set("fs.default.name", "file:///");
+    conf.set("tmpfiles", first.toString());
+    conf.set("tmpjars", second.toString());
+    conf.set("tmparchives", third.toString()); 
+    conf.set(JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY, 
+        localFs.getUri().getScheme());
+    conf.setMaxMapAttempts(1); // speed up failures
+    // Submit job
+    Job job = new Job(conf);
+    job.setMapperClass(DistributedCacheCheckerJTStagingOnNondefaultFS.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    FileInputFormat.setInputPaths(job, first);
+    job.submit();
+    // Check if the job is successful
+    assertTrue(job.waitForCompletion(false));
+  }
+  
+  public static class DistributedCacheCheckerJTStagingOnNondefaultFS extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      Path[] files = DistributedCache.getLocalCacheFiles(conf);
+      Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+      // Check that 1 file and 2 archives are present
+      assertEquals(1, files.length);
+      assertEquals(2, archives.length);
+      // Check lengths of the file
+      assertEquals(1, localFs.getFileStatus(files[0]).getLen());
+      // Check the existence of the archives
+      assertTrue(localFs
+          .exists(new Path(archives[0], "distributed.jar.inside2")));
+      assertTrue(localFs
+          .exists(new Path(archives[1], "distributed.jar.inside3")));
+      // Check the schemes of the files/archives specified in
+      // "mapred.cache.archives" and "mapred.cache.files". The schemes
+      // must be "file" if we do not copy the files/archives to jt's staging
+      // dir
+      String[] arxSources = conf.getStrings("mapred.cache.archives");
+      String[] fileSources = conf.getStrings("mapred.cache.files");
+      assertEquals("file", (new Path(fileSources[0])).toUri().getScheme());
+      assertEquals("file", (new Path(arxSources[0])).toUri().getScheme());
+      assertEquals("file", (new Path(arxSources[1])).toUri().getScheme());
+      // Check the class loaders
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      // The archives added by "tmpjars" should be reachable via 
+      // the class loader.
+      TestCase.assertNotNull(cl.getResource("distributed.jar.inside2"));
+      TestCase.assertNull(cl.getResource("distributed.jar.inside3"));
+    }
+  }
+  
   private Path createTempFile(String filename, String contents)
       throws IOException {
     Path path = new Path(TEST_ROOT_DIR, filename);