You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/22 21:50:23 UTC

svn commit: r1401054 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/o...

Author: jlowe
Date: Mon Oct 22 19:50:23 2012
New Revision: 1401054

URL: http://svn.apache.org/viewvc?rev=1401054&view=rev
Log:
MAPREDUCE-4740. only .jars can be added to the Distributed Cache classpath. Contributed by Robert Joseph Evans

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1401054&r1=1401053&r2=1401054&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Oct 22 19:50:23 2012
@@ -599,6 +599,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
     reducers complete consecutively. (Jason Lowe via vinodkv)
 
+    MAPREDUCE-4740. only .jars can be added to the Distributed Cache
+    classpath. (Robert Joseph Evans via jlowe)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1401054&r1=1401053&r2=1401054&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Mon Oct 22 19:50:23 2012
@@ -191,6 +191,7 @@ public class MRApps extends Apps {
     // TODO: Remove duplicates.
   }
   
+  @SuppressWarnings("deprecation")
   public static void setClasspath(Map<String, String> environment,
       Configuration conf) throws IOException {
     boolean userClassesTakesPrecedence = 
@@ -218,11 +219,66 @@ public class MRApps extends Apps {
         environment,
         Environment.CLASSPATH.name(),
         Environment.PWD.$() + Path.SEPARATOR + "*");
+    // a * in the classpath will only find a .jar, so we need to filter out
+    // all .jars and add everything else
+    addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
+        DistributedCache.getCacheFiles(conf),
+        conf,
+        environment);
+    addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
+        DistributedCache.getCacheArchives(conf),
+        conf,
+        environment);
     if (userClassesTakesPrecedence) {
       MRApps.setMRFrameworkClasspath(environment, conf);
     }
   }
   
+  /**
+   * Add the paths to the classpath if they are not jars
+   * @param paths the paths to add to the classpath
+   * @param withLinks the corresponding paths that may have a link name in them
+   * @param conf used to resolve the paths
+   * @param environment the environment to update CLASSPATH in
+   * @throws IOException if there is an error resolving any of the paths.
+   */
+  private static void addToClasspathIfNotJar(Path[] paths,
+      URI[] withLinks, Configuration conf,
+      Map<String, String> environment) throws IOException {
+    if (paths != null) {
+      HashMap<Path, String> linkLookup = new HashMap<Path, String>();
+      if (withLinks != null) {
+        for (URI u: withLinks) {
+          Path p = new Path(u);
+          FileSystem remoteFS = p.getFileSystem(conf);
+          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+              remoteFS.getWorkingDirectory()));
+          String name = (null == u.getFragment())
+              ? p.getName() : u.getFragment();
+          if (!name.toLowerCase().endsWith(".jar")) {
+            linkLookup.put(p, name);
+          }
+        }
+      }
+      
+      for (Path p : paths) {
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        String name = linkLookup.get(p);
+        if (name == null) {
+          name = p.getName();
+        }
+        if(!name.toLowerCase().endsWith(".jar")) {
+          Apps.addToEnvironment(
+              environment,
+              Environment.CLASSPATH.name(),
+              Environment.PWD.$() + Path.SEPARATOR + name);
+        }
+      }
+    }
+  }
+
   private static final String STAGING_CONSTANT = ".staging";
   public static Path getStagingAreaDir(Configuration conf, String user) {
     return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
@@ -261,8 +317,7 @@ public class MRApps extends Apps {
         DistributedCache.getCacheArchives(conf), 
         parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
         getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf), 
-        DistributedCache.getArchiveClassPaths(conf));
+        DistributedCache.getArchiveVisibilities(conf));
     
     // Cache files
     parseDistributedCacheArtifacts(conf, 
@@ -271,8 +326,7 @@ public class MRApps extends Apps {
         DistributedCache.getCacheFiles(conf),
         parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
         getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf),
-        DistributedCache.getFileClassPaths(conf));
+        DistributedCache.getFileVisibilities(conf));
   }
 
   private static String getResourceDescription(LocalResourceType type) {
@@ -289,8 +343,8 @@ public class MRApps extends Apps {
       Configuration conf,
       Map<String, LocalResource> localResources,
       LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] pathsToPutOnClasspath) throws IOException {
+      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
+  throws IOException {
 
     if (uris != null) {
       // Sanity check
@@ -304,15 +358,6 @@ public class MRApps extends Apps {
             );
       }
       
-      Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (pathsToPutOnClasspath != null) {
-        for (Path p : pathsToPutOnClasspath) {
-          FileSystem remoteFS = p.getFileSystem(conf);
-          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-              remoteFS.getWorkingDirectory()));
-          classPaths.put(p.toUri().getPath().toString(), p);
-        }
-      }
       for (int i = 0; i < uris.length; ++i) {
         URI u = uris[i];
         Path p = new Path(u);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1401054&r1=1401053&r2=1401054&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Mon Oct 22 19:50:23 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.util;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
@@ -42,12 +44,36 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 public class TestMRApps {
+  private static File testWorkDir = null;
+  
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
+    delete(testWorkDir);
+    testWorkDir.mkdirs();
+    testWorkDir = testWorkDir.getAbsoluteFile();
+  }
+  
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testWorkDir != null) {
+      delete(testWorkDir);
+    }
+  }
+  
+  private static void delete(File dir) throws IOException {
+    Path p = new Path("file://"+dir.getAbsolutePath());
+    Configuration conf = new Configuration();
+    FileSystem fs = p.getFileSystem(conf);
+    fs.delete(p, true);
+  }
 
   @Test public void testJobIDtoString() {
     JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
@@ -154,6 +180,28 @@ public class TestMRApps {
     }
     assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
   }
+  
+  @Test public void testSetClasspathWithArchives () throws IOException {
+    File testTGZ = new File(testWorkDir, "test.tgz");
+    FileOutputStream out = new FileOutputStream(testTGZ);
+    out.write(0);
+    out.close();
+    Job job = Job.getInstance();
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://" 
+        + testTGZ.getAbsolutePath());
+    conf.set(MRJobConfig.CACHE_ARCHIVES, "file://"
+        + testTGZ.getAbsolutePath() + "#testTGZ");
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.setClasspath(environment, conf);
+    assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
+    String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+    if (confClasspath != null) {
+      confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
+    }
+    assertTrue(environment.get("CLASSPATH").contains(confClasspath));
+    assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
+  }
 
  @Test public void testSetClasspathWithUserPrecendence() {
     Configuration conf = new Configuration();