You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/12/15 21:55:59 UTC

svn commit: r890983 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapred/ sr...

Author: tomwhite
Date: Tue Dec 15 20:55:58 2009
New Revision: 890983

URL: http://svn.apache.org/viewvc?rev=890983&view=rev
Log:
MAPREDUCE-967. TaskTracker does not need to fully unjar job jars. Contributed by Todd Lipcon.

Added:
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec 15 20:55:58 2009
@@ -74,6 +74,9 @@
     MAPREDUCE-1209. Move common specific part of the test TestReflectionUtils
     out of mapred into common. (Todd Lipcon via tomwhite)
 
+    MAPREDUCE-967. TaskTracker does not need to fully unjar job jars.
+    (Todd Lipcon via tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Dec 15 20:55:58 2009
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
@@ -272,9 +273,16 @@
       
       values = cmdLine.getOptionValues("file");
       if (values != null && values.length > 0) {
+        StringBuilder unpackRegex = new StringBuilder(
+          config_.getPattern(JobContext.JAR_UNPACK_PATTERN,
+                             JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern());
         for (String file : values) {
           packageFiles_.add(file);
+          String fname = new File(file).getName();
+          unpackRegex.append("|(?:").append(Pattern.quote(fname)).append(")");
         }
+        config_.setPattern(JobContext.JAR_UNPACK_PATTERN,
+                           Pattern.compile(unpackRegex.toString()));
         validate(packageFiles_);
       }
          

Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java?rev=890983&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java Tue Dec 15 20:55:58 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.zip.GZIPOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+/**
+ * This class tests that the '-file' argument to streaming results
+ * in files being unpacked in the job working directory.
+ */
+public class TestFileArgs extends TestStreaming
+{
+  private MiniDFSCluster dfs = null;
+  private MiniMRCluster mr = null;
+  private FileSystem fileSys = null;
+  private String strJobTracker = null;
+  private String strNamenode = null;
+  private String namenode = null;
+  private Configuration conf = null;
+
+  private static final String EXPECTED_OUTPUT =
+    "job.jar\t\nsidefile\t\ntmp\t\n";
+
+  private static final String LS_PATH = "/bin/ls";
+
+  public TestFileArgs() throws IOException
+  {
+    // Set up mini cluster
+    conf = new Configuration();
+    dfs = new MiniDFSCluster(conf, 1, true, null);
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().getAuthority();
+    mr  = new MiniMRCluster(1, namenode, 1);
+    strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
+    strNamenode = "fs.default.name=hdfs://" + namenode;
+
+    FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
+
+    // Set up side file
+    FileSystem localFs = FileSystem.getLocal(conf);
+    DataOutputStream dos = localFs.create(new Path("sidefile"));
+    dos.write("hello world\n".getBytes("UTF-8"));
+    dos.close();
+  }
+
+  @Override
+  protected String getExpectedOutput() {
+    return EXPECTED_OUTPUT;
+  }
+
+  @Override
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-file", new java.io.File("sidefile").getAbsolutePath(),
+      "-mapper", LS_PATH,
+      "-numReduceTasks", "0",
+      "-jobconf", strNamenode,
+      "-jobconf", strJobTracker,
+      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestFileArgs().testCommandLine();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Tue Dec 15 20:55:58 2009
@@ -22,6 +22,10 @@
 import java.io.*;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
 
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
@@ -50,11 +54,15 @@
     utilTest.redirectIfAntJunit();
   }
 
+  protected String getInputData() {
+    return input;
+  }
+
   protected void createInput() throws IOException
   {
-    DataOutputStream out = new DataOutputStream(
-                                                new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
-    out.write(input.getBytes("UTF-8"));
+    DataOutputStream out = getFileSystem().create(
+      new Path(INPUT_FILE.getAbsolutePath()));
+    out.write(getInputData().getBytes("UTF-8"));
     out.close();
   }
 
@@ -70,7 +78,29 @@
       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
     };
   }
-  
+
+  protected Configuration getConf() {
+    return new Configuration();
+  }
+
+  protected FileSystem getFileSystem() throws IOException {
+    return FileSystem.get(getConf());
+  }
+
+  protected String getExpectedOutput() {
+    return outputExpect;
+  }
+
+  protected void checkOutput() throws IOException {
+    Path outPath = new Path(OUTPUT_DIR.getAbsolutePath(), "part-00000");
+    FileSystem fs = getFileSystem();
+    String output = StreamUtil.slurpHadoop(outPath, fs);
+    fs.delete(outPath, true);
+    System.err.println("outEx1=" + getExpectedOutput());
+    System.err.println("  out1=" + output);
+    assertEquals(getExpectedOutput(), output);
+  }
+
   public void testCommandLine() throws IOException
   {
     try {
@@ -84,14 +114,10 @@
 
       // During tests, the default Configuration will use a local mapred
       // So don't specify -config or -cluster
-      job = new StreamJob(genArgs(), mayExit);      
-      job.go();
-      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
-      String output = StreamUtil.slurp(outFile);
-      outFile.delete();
-      System.err.println("outEx1=" + outputExpect);
-      System.err.println("  out1=" + output);
-      assertEquals(outputExpect, output);
+      job = new StreamJob(genArgs(), mayExit);
+      int ret = job.go();
+      assertEquals(0, ret);
+      checkOutput();
     } finally {
       try {
         INPUT_FILE.delete();

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Dec 15 20:55:58 2009
@@ -1507,9 +1507,11 @@
         <li><code>${mapreduce.cluster.local.dir}/taskTracker/jobcache/$jobid/jars/</code>
         : The jars directory, which has the job jar file and expanded jar.
         The <code>job.jar</code> is the application's jar file that is
-        automatically distributed to each machine. It is expanded in jars
-        directory before the tasks for the job start. The job.jar location
-        is accessible to the application through the api
+        automatically distributed to each machine. Any library jars that are dependencies
+        of the application code may be packaged inside this jar in a <code>lib/</code> directory.
+        This directory is extracted from <code>job.jar</code> and its contents are
+        automatically added to the classpath for each task.
+        The job.jar location is accessible to the application through the api
         <a href="ext:api/org/apache/hadoop/mapred/jobconf/getjar"> 
         JobConf.getJar() </a>. To access the unjarred directory,
         JobConf.getJar().getParent() can be called.</li>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Dec 15 20:55:58 2009
@@ -24,6 +24,7 @@
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -165,6 +166,10 @@
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
     JobContext.REDUCE_MEMORY_MB;
 
+  /** Pattern for the default unpacking behavior for job jars */
+  public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
+    Pattern.compile("(?:classes/|lib/).*");
+
   /**
    * Configuration key to set the java command line options for the child
    * map and reduce tasks.
@@ -418,6 +423,14 @@
    * @param jar the user jar for the map-reduce job.
    */
   public void setJar(String jar) { set(JobContext.JAR, jar); }
+
+  /**
+   * Get the pattern for jar contents to unpack on the tasktracker
+   */
+  public Pattern getJarUnpackPattern() {
+    return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
+  }
+
   
   /**
    * Set the job's jar file by finding an example class location.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Dec 15 20:55:58 2009
@@ -649,7 +649,7 @@
       }
     }
     classPaths.add(new File(jobCacheDir, "classes").toString());
-    classPaths.add(jobCacheDir.toString());
+    classPaths.add(new File(jobCacheDir, "job.jar").toString());
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Dec 15 20:55:58 2009
@@ -121,6 +121,7 @@
   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
     "mapred.tasktracker.pmem.reserved";
 
+
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -1008,10 +1009,11 @@
 
       localJobConf.setJar(localJarFile.toString());
 
-      // Also un-jar the job.jar files. We un-jar it so that classes inside
-      // sub-directories, for e.g., lib/, classes/ are available on class-path
-      RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
-          .getParent().toString()));
+      // Un-jar the parts of the job.jar that need to be added to the classpath
+      RunJar.unJar(
+        new File(localJarFile.toString()),
+        new File(localJarFile.getParent().toString()),
+        localJobConf.getJarUnpackPattern());
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Dec 15 20:55:58 2009
@@ -55,6 +55,7 @@
   public static final String JAR = "mapreduce.job.jar";
   public static final String ID = "mapreduce.job.id";
   public static final String JOB_NAME = "mapreduce.job.name";
+  public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
   public static final String USER_NAME = "mapreduce.job.user.name";
   public static final String PRIORITY = "mapreduce.job.priority";
   public static final String QUEUE_NAME = "mapreduce.job.queuename";