You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/19 05:56:35 UTC

svn commit: r465465 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/testjar/

Author: cutting
Date: Wed Oct 18 20:56:34 2006
New Revision: 465465

URL: http://svn.apache.org/viewvc?view=rev&rev=465465
Log:
HADOOP-607 & HADOOP-609.  Fix a critical bug where job jars were not found by tasks.  Also add unit tests to check for this and other conditions.  Contributed by Mahadev.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    lucene/hadoop/trunk/src/test/testjar/
    lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 18 20:56:34 2006
@@ -28,6 +28,16 @@
     than ArrayOutOfBoundsException.  (Dhruba Borthakur via cutting) 
 
 
+Release 0.7.2 - 2006-10-18
+
+ 1. HADOOP-607.  Fix a bug where classes included in job jars were not
+    found by tasks.  (Mahadev Konar via cutting)
+
+ 2. HADOOP-609.  Add a unit test that checks that classes in job jars
+    can be found by tasks.  Also modify unit tests to specify multiple
+    local directories.  (Mahadev Konar via cutting)
+
+
 Release 0.7.1 - 2006-10-11
 
  1. HADOOP-593.  Fix a NullPointerException in the JobTracker.

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Wed Oct 18 20:56:34 2006
@@ -39,6 +39,7 @@
   <property name="test.cache.data" value="${test.build.dir}/cache"/>
   <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
+  <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
   <property name="test.include" value="Test*"/>
   <property name="test.classpath.id" value="test.classpath"/>
@@ -104,7 +105,7 @@
  
     <mkdir dir="${test.build.dir}"/>
     <mkdir dir="${test.build.classes}"/>
-
+    <mkdir dir="${test.build.testjar}"/>
     <touch datetime="01/25/1971 2:00 pm">
       <fileset dir="${conf.dir}" includes="**/*.template"/>
     </touch>
@@ -282,7 +283,24 @@
      source="${javac.version}"
      deprecation="${javac.deprecation}">
       <classpath refid="test.classpath"/>
-    </javac>    
+    </javac> 
+    <javac
+     encoding="${build.encoding}"
+     srcdir="${test.src.dir}/testjar"
+     includes="*.java"
+     destdir="${test.build.testjar}"
+     debug="${javac.debug}"
+     optimize="${javac.optimize}"
+     target="${javac.version}"
+     source="${javac.version}"
+     deprecation="${javac.deprecation}">
+      <classpath refid="test.classpath"/>
+    </javac>                                 
+    <delete file="${test.build.testjar}/testjob.jar"/> 
+    <jar jarfile="${test.build.testjar}/testjob.jar"
+     basedir="${test.build.testjar}">
+    </jar>
+                                                              
     <jar jarfile="${build.dir}/${final.name}-test.jar"
          basedir="${test.build.classes}">
          <manifest>
@@ -294,7 +312,7 @@
     <mkdir dir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
-    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>    
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>
   </target>
 
   <!-- ================================================================== -->

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 18 20:56:34 2006
@@ -817,9 +817,8 @@
         
         private void localizeTask(Task task) throws IOException{
             Path localTaskDir =
-              new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
-                    + JOBCACHE + Path.SEPARATOR
-                    + task.getJobId()), task.getTaskId());
+              new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()), 
+                (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
            FileSystem localFs = FileSystem.getNamed("local", fConf);
            localFs.mkdirs(localTaskDir);
            Path localTaskFile = new Path(localTaskDir, "job.xml");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Oct 18 20:56:34 2006
@@ -79,9 +79,16 @@
      */
     class TaskTrackerRunner implements Runnable {
         TaskTracker tt;
-        String localDir;
+        // the localDirs for this taskTracker
+        String[] localDir;
         boolean isInitialized = false;
         boolean isDead = false;
+        int numDir;       
+        TaskTrackerRunner(int numDir) {
+          this.numDir = numDir;
+          // a maximum of 10 local dirs can be specified in MinMRCluster
+          localDir = new String[10];
+        }
         
         /**
          * Create and run the task tracker.
@@ -97,10 +104,19 @@
                 jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
                 jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
                 File localDir = new File(jc.get("mapred.local.dir"));
-                File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
+                String mapredDir = "";
+                File ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + 0);
                 ttDir.mkdirs();
-                this.localDir = ttDir.getAbsolutePath();
-                jc.set("mapred.local.dir", ttDir.getAbsolutePath());
+                this.localDir[0] = ttDir.getAbsolutePath();
+                mapredDir = ttDir.getAbsolutePath();
+                for (int i = 1; i < numDir; i++){
+                  ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + i);
+                  ttDir.mkdirs();
+                  this.localDir[i] = ttDir.getAbsolutePath();
+                  mapredDir = mapredDir + "," + ttDir.getAbsolutePath();
+                }
+                jc.set("mapred.local.dir", mapredDir);
+                System.out.println("mapred.local.dir is " +  mapredDir);
                 tt = new TaskTracker(jc);
                 isInitialized = true;
                 tt.run();
@@ -114,12 +130,17 @@
         
         /**
          * Get the local dir for this TaskTracker.
+         * This is there so that we do not break
+         * previous tests. 
          * @return the absolute pathname
          */
         public String getLocalDir() {
-          return localDir;
+          return localDir[0];
         }
-        
+       
+        public String[] getLocalDirs(){
+         return localDir;
+        } 
         /**
          * Shut down the server and wait for it to finish.
          */
@@ -176,10 +197,18 @@
      * Create the config and start up the servers.
      */
     public MiniMRCluster(int jobTrackerPort,
+                         int taskTrackerPort,
+                         int numTaskTrackers,
+                         String namenode,
+                         boolean taskTrackerFirst) throws IOException {
+        this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, taskTrackerFirst, 1);
+    } 
+  
+    public MiniMRCluster(int jobTrackerPort,
             int taskTrackerPort,
             int numTaskTrackers,
             String namenode,
-            boolean taskTrackerFirst) throws IOException {
+            boolean taskTrackerFirst, int numDir) throws IOException {
         this.jobTrackerPort = jobTrackerPort;
         this.taskTrackerPort = taskTrackerPort;
         this.numTaskTrackers = numTaskTrackers;
@@ -204,7 +233,7 @@
           jobTrackerThread.start();
         }
         for (int idx = 0; idx < numTaskTrackers; idx++) {
-            TaskTrackerRunner taskTracker = new TaskTrackerRunner();
+            TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
             Thread taskTrackerThread = new Thread(taskTracker);
             taskTrackerThread.start();
             taskTrackerList.add(taskTracker);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Wed Oct 18 20:56:34 2006
@@ -106,7 +106,7 @@
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
           mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
-                                 namenode, true);
+                                 namenode, true, 2);
 
           JobConf jobConf = new JobConf();
           boolean result;

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=auto&rev=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Wed Oct 18 20:56:34 2006
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
+ * and check for correct classpath
+ */
+public class TestMiniMRClasspath extends TestCase {
+  
+  
+   static String launchWordCount(String fileSys,
+                                       String jobTracker,
+                                       JobConf conf,
+                                       String input,
+                                       int numMaps,
+                                       int numReduces) throws IOException {
+    final Path inDir = new Path("/testing/wc/input");
+    final Path outDir = new Path("/testing/wc/output");
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    fs.mkdirs(inDir);
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("wordcount");
+    conf.setInputFormat(TextInputFormat.class);
+    
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+    
+    conf.set("mapred.mapper.class", "ClassWordCount$MapClass");        
+    conf.set("mapred.combine.class", "ClassWordCount$Reduce");
+    conf.set("mapred.reducer.class", "ClassWordCount$Reduce");
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+    //pass a job.jar already included in the hadoop build
+    conf.setJar("build/test/testjar/testjob.jar");
+    JobClient.runJob(conf);
+    StringBuffer result = new StringBuffer();
+    {
+      Path[] fileList = fs.listPaths(outDir);
+      for(int i=0; i < fileList.length; ++i) {
+        BufferedReader file = 
+          new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
+        String line = file.readLine();
+        while (line != null) {
+          result.append(line);
+          result.append("\n");
+          line = file.readLine();
+        }
+        file.close();
+      }
+    }
+    return result.toString();
+  }
+  
+  public void testClassPath() throws IOException {
+      String namenode = null;
+      MiniDFSCluster dfs = null;
+      MiniMRCluster mr = null;
+      FileSystem fileSys = null;
+      try {
+          final int taskTrackers = 4;
+          final int jobTrackerPort = 50050;
+          final String jobTrackerName = "localhost:" + jobTrackerPort;
+          Configuration conf = new Configuration();
+          dfs = new MiniDFSCluster(65314, conf, true);
+          fileSys = dfs.getFileSystem();
+          namenode = fileSys.getName();
+          mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
+                                 namenode, true, 3);
+          JobConf jobConf = new JobConf();
+          String result;
+          result = launchWordCount(namenode, jobTrackerName, jobConf, 
+                                   "The quick brown fox\nhas many silly\n" + 
+                                   "red fox sox\n",
+                                   3, 1);
+          assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                       "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
+          
+      } finally {
+          if (fileSys != null) { fileSys.close(); }
+          if (dfs != null) { dfs.shutdown(); }
+          if (mr != null) { mr.shutdown();
+          }
+      }
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Wed Oct 18 20:56:34 2006
@@ -42,7 +42,7 @@
       dfs = new MiniDFSCluster(65314, conf, true);
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getName();
-      mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+      mr = new MiniMRCluster(50050, 50060, 2, namenode, true, 4);
       // run the wordcount example with caching
       boolean ret = MRCaching.launchMRCache("localhost:50050",
                                             "/testing/wc/input",

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Oct 18 20:56:34 2006
@@ -32,7 +32,7 @@
   public void testWithLocal() throws IOException {
       MiniMRCluster mr = null;
       try {
-          mr = new MiniMRCluster(60030, 60040, 2, "local", false);
+          mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));

Added: lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java?view=auto&rev=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java (added)
+++ lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java Wed Oct 18 20:56:34 2006
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.examples.WordCount;
+
+/**
+ * This is an example Hadoop Map/Reduce application being used for 
+ * TestMiniMRClasspath. Uses the WordCount examples in hadoop.
+ * @author Owen O'Malley
+ *
+ */
+public class ClassWordCount {
+  
+  /**
+   * Counts the words in each line.
+   * For each line of input, break the line into words and emit them as
+   * (<b>word</b>, <b>1</b>).
+   */
+  public static class MapClass extends WordCount.MapClass implements Mapper {
+  }
+  
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class Reduce extends WordCount.Reduce implements Reducer {
+  }
+}