You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/25 09:57:44 UTC

svn commit: r425322 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Jul 25 00:57:43 2006
New Revision: 425322

URL: http://svn.apache.org/viewvc?rev=425322&view=rev
Log:
HADOOP-381.  Permit developers to save temporary files for tasks whose names match a regular expression.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 25 00:57:43 2006
@@ -69,6 +69,10 @@
 19. HADOOP-260.  Add --config option to shell scripts, specifying an
     alternate configuration directory. (Milind Bhandarkar via cutting)
 
+20. HADOOP-381.  Permit developers to save the temporary files for
+    tasks whose names match a regular expression, to facilliate
+    debugging.  (omalley via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Jul 25 00:57:43 2006
@@ -70,6 +70,7 @@
     <pathelement location="${build.minimr}" />
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
+    <pathelement location="${build.examples}"/>
     <path refid="classpath"/>
   </path>
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Jul 25 00:57:43 2006
@@ -371,6 +371,24 @@
 </property>
 
 <property>
+  <name>keep.failed.task.files</name>
+  <value>false</value>
+  <description>Should the files for failed tasks be kept. This should only be 
+               used on jobs that are failing, because the storage is never
+               reclaimed. It also prevents the map outputs from being erased
+               from the reduce directory as they are consumed.</description>
+</property>
+
+<!-- 
+  <property>
+  <name>keep.task.files.pattern</name>
+  <value>.*_m_123456_0</value>
+  <description>Keep all files from tasks whose task names match the given
+               regular expression. Defaults to none.</description>
+  </property>
+-->
+
+<property>
   <name>mapred.compress.map.output</name>
   <value>false</value>
   <description>Should the outputs of the maps be compressed before being

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jul 25 00:57:43 2006
@@ -214,6 +214,26 @@
   }
   
   /**
+   * Set a regular expression for task names that should be kept. 
+   * The regular expression ".*_m_000123_0" would keep the files
+   * for the first instance of map 123 that ran.
+   * @param pattern the java.util.regex.Pattern to match against the 
+   *        task names.
+   */
+  public void setKeepTaskFilesPattern(String pattern) {
+    set("keep.task.files.pattern", pattern);
+  }
+  
+  /**
+   * Get the regular expression that is matched against the task names
+   * to see if we need to keep the files.
+   * @return the pattern as a string, if it was set, othewise null
+   */
+  public String getKeepTaskFilesPattern() {
+    return get("keep.task.files.pattern");
+  }
+  
+  /**
    * Set the current working directory for the default file system
    * @param dir the new current working directory
    */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jul 25 00:57:43 2006
@@ -26,6 +26,7 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -692,6 +693,7 @@
         private JobConf defaultJobConf;
         private JobConf localJobConf;
         private boolean keepFailedTaskFiles;
+        private boolean alwaysKeepTaskFiles;
 
         /**
          */
@@ -742,6 +744,13 @@
             // rather than the default.
             t.setConf(localJobConf);
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+            String keepPattern = localJobConf.getKeepTaskFilesPattern();
+            if (keepPattern != null) {
+              alwaysKeepTaskFiles = 
+                Pattern.matches(keepPattern, task.getTaskId());
+            } else {
+              alwaysKeepTaskFiles = false;
+            }
         }
 
         /**
@@ -916,7 +925,9 @@
             LOG.debug("Cleaning up " + taskId);
             synchronized (TaskTracker.this) {
                tasks.remove(taskId);
-               if (runstate == TaskStatus.FAILED && keepFailedTaskFiles) {
+               if (alwaysKeepTaskFiles ||
+                   (runstate == TaskStatus.FAILED && 
+                       keepFailedTaskFiles)) {
                  return;
                }
                synchronized (this) {
@@ -1159,6 +1170,14 @@
         if( !writable )
             throw new DiskErrorException( 
                     "all local directories are not writable" );
+    }
+    
+    /**
+     * Is this task tracker idle?
+     * @return has this task tracker finished and cleaned up all of its tasks?
+     */
+    public synchronized boolean isIdle() {
+      return tasks.isEmpty();
     }
     
     /**

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Jul 25 00:57:43 2006
@@ -16,8 +16,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.util.ArrayList;
-import org.apache.hadoop.conf.Configuration;
+import java.util.*;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -28,15 +27,14 @@
     
     private Thread jobTrackerThread;
     private JobTrackerRunner jobTracker;
-    private TaskTrackerRunner taskTracker;
     
     private int jobTrackerPort = 0;
     private int taskTrackerPort = 0;
     
     private int numTaskTrackers;
     
-    private ArrayList taskTrackerList = new ArrayList();
-    private ArrayList taskTrackerThreadList = new ArrayList();
+    private List taskTrackerList = new ArrayList();
+    private List taskTrackerThreadList = new ArrayList();
     
     private String namenode;
     
@@ -81,6 +79,7 @@
      */
     class TaskTrackerRunner implements Runnable {
         TaskTracker tt;
+        String localDir;
         
         /**
          * Create and run the task tracker.
@@ -99,6 +98,7 @@
                 File localDir = new File(jc.get("mapred.local.dir"));
                 File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
                 ttDir.mkdirs();
+                this.localDir = ttDir.getAbsolutePath();
                 jc.set("mapred.local.dir", ttDir.getAbsolutePath());
                 tt = new TaskTracker(jc);
                 tt.run();
@@ -110,6 +110,14 @@
         }
         
         /**
+         * Get the local dir for this TaskTracker.
+         * @return the absolute pathname
+         */
+        public String getLocalDir() {
+          return localDir;
+        }
+        
+        /**
          * Shut down the server and wait for it to finish.
          */
         public void shutdown() {
@@ -122,6 +130,39 @@
                 }
             }
         }
+    }
+    
+    /**
+     * Get the local directory for the Nth task tracker
+     * @param taskTracker the index of the task tracker to check
+     * @return the absolute pathname of the local dir
+     */
+    public String getTaskTrackerLocalDir(int taskTracker) {
+      return ((TaskTrackerRunner) 
+              taskTrackerList.get(taskTracker)).getLocalDir();
+    }
+
+    /**
+     * Get the number of task trackers in the cluster
+     */
+    public int getNumTaskTrackers() {
+      return taskTrackerList.size();
+    }
+    
+    /**
+     * Wait until the system is idle.
+     */
+    public void waitUntilIdle() {
+      for(Iterator itr= taskTrackerList.iterator(); itr.hasNext(); ) {
+        TaskTracker tracker = ((TaskTrackerRunner) itr.next()).tt;
+        while (!tracker.isIdle()) {
+          System.out.println("Waiting for task tracker " + tracker.getName() +
+                             " to finish.");
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
     }
     
     /**

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=425322&r1=425321&r2=425322&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Jul 25 00:57:43 2006
@@ -16,11 +16,16 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
+import java.io.*;
+import java.util.*;
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.examples.WordCount;
 
 /**
  * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
@@ -31,21 +36,139 @@
   
     static final int NUM_MAPS = 10;
     static final int NUM_SAMPLES = 100000;
+  
+  public static String launchWordCount(String fileSys,
+                                       String jobTracker,
+                                       JobConf conf,
+                                       String input,
+                                       int numMaps,
+                                       int numReduces) throws IOException {
+    final Path inDir = new Path("/testing/wc/input");
+    final Path outDir = new Path("/testing/wc/output");
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    fs.mkdirs(inDir);
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("wordcount");
+    
+    // the keys are words (strings)
+    conf.setOutputKeyClass(UTF8.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
     
+    conf.setMapperClass(WordCount.MapClass.class);        
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+    JobClient.runJob(conf);
+    StringBuffer result = new StringBuffer();
+    {
+      Path[] fileList = fs.listPaths(outDir);
+      for(int i=0; i < fileList.length; ++i) {
+        BufferedReader file = 
+          new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
+        String line = file.readLine();
+        while (line != null) {
+          result.append(line);
+          result.append("\n");
+          line = file.readLine();
+        }
+        file.close();
+      }
+    }
+    return result.toString();
+  }
+  
+  /**
+   * Make sure that there are exactly the directories that we expect to find.
+   * @param mr the map-reduce cluster
+   * @param taskDirs the task ids that should be present
+   */
+  private static void checkTaskDirectories(MiniMRCluster mr,
+                                           String[] taskDirs) {
+    mr.waitUntilIdle();
+    int trackers = mr.getNumTaskTrackers();
+    List neededDirs = new ArrayList(Arrays.asList(taskDirs));
+    boolean[] found = new boolean[taskDirs.length];
+    for(int i=0; i < trackers; ++i) {
+      File localDir = new File(mr.getTaskTrackerLocalDir(i));
+      File trackerDir = new File(localDir, "taskTracker");
+      assertTrue("local dir " + localDir + " does not exist.", 
+                   localDir.isDirectory());
+      assertTrue("task tracker dir " + trackerDir + " does not exist.", 
+                   trackerDir.isDirectory());
+      String contents[] = localDir.list();
+      String trackerContents[] = trackerDir.list();
+      for(int j=0; j < contents.length; ++j) {
+        System.out.println("Local " + localDir + ": " + contents[j]);
+      }
+      for(int j=0; j < trackerContents.length; ++j) {
+        System.out.println("Local " + trackerDir + ": " + trackerContents[j]);
+      }
+      for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
+        String name = contents[fileIdx];
+        if (!("taskTracker".equals(contents[fileIdx]))) {
+          int idx = neededDirs.indexOf(name);
+          assertTrue("Spurious directory " + name + " found in " +
+                     localDir, idx != -1);
+          assertTrue("Matching output directory not found " + name +
+                     " in " + trackerDir, 
+                     new File(trackerDir, name).isDirectory());
+          found[idx] = true;
+        }  
+      }
+      assertTrue("The local directory had " + contents.length + 
+                 " and task tracker directory had " + trackerContents.length +
+                 " items.", contents.length == trackerContents.length + 1);
+    }
+    for(int i=0; i< found.length; i++) {
+      assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
+    }
+  }
+  
   public void testWithDFS() throws IOException {
       String namenode = null;
       MiniDFSCluster dfs = null;
       MiniMRCluster mr = null;
       FileSystem fileSys = null;
       try {
+          final int taskTrackers = 4;
+          final int jobTrackerPort = 50050;
+          final String jobTrackerName = "localhost:" + jobTrackerPort;
           Configuration conf = new Configuration();
           dfs = new MiniDFSCluster(65314, conf, true);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
-          mr = new MiniMRCluster(50050, 50060, 4, namenode, true);
-          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode);
+          mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
+                                 namenode, true);
+          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
+                                               jobTrackerName, namenode);
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+          checkTaskDirectories(mr, new String[]{});
+          
+          // Run a word count example
+          JobConf jobConf = new JobConf();
+          // Keeping tasks that match this pattern
+          jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*");
+          String result;
+          result = launchWordCount(namenode, jobTrackerName, jobConf, 
+                                   "The quick brown fox\nhas many silly\n" + 
+                                   "red fox sox\n",
+                                   3, 1);
+          assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                       "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
+          checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"});
+          
       } finally {
           if (fileSys != null) { fileSys.close(); }
           if (dfs != null) { dfs.shutdown(); }