You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/19 23:48:08 UTC

svn commit: r415438 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/task/

Author: cutting
Date: Mon Jun 19 14:48:07 2006
New Revision: 415438

URL: http://svn.apache.org/viewvc?rev=415438&view=rev
Log:
HADOOP-123.  Add MapReduce unit tests that run a jobtracker and tasktracker, greatly increasing code coverage.  Contributed by Milind.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 19 14:48:07 2006
@@ -12,6 +12,10 @@
  3. HADOOP-250.  Add an HTTP user interface to the namenode, running
     on port 50070. (Devaraj Das via cutting)
 
+ 4. HADOOP-123.  Add MapReduce unit tests that run a jobtracker and
+    tasktracker, greatly increasing code coverage.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Mon Jun 19 14:48:07 2006
@@ -28,12 +28,14 @@
   <property name="build.examples" value="${build.dir}/examples"/>
   <property name="build.libhdfs" value="${build.dir}/libhdfs"/>
   <property name="build.docs" value="${build.dir}/docs"/>
+  <property name="build.minimr" value="${build.dir}/minimr"/>
   <property name="build.javadoc" value="${build.docs}/api"/>
   <property name="build.encoding" value="ISO-8859-1"/>
 
   <property name="test.src.dir" value="${basedir}/src/test"/>
   <property name="test.build.dir" value="${build.dir}/test"/>
   <property name="test.build.data" value="${test.build.dir}/data"/>
+  <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
   <property name="test.include" value="Test*"/>
@@ -65,6 +67,7 @@
   <!-- the unit test classpath: uses test.src.dir for configuration -->
   <path id="test.classpath">
     <pathelement location="${test.build.classes}" />
+    <pathelement location="${build.minimr}" />
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
     <path refid="classpath"/>
@@ -89,6 +92,7 @@
     <mkdir dir="${build.webapps}/job/WEB-INF"/>
     <mkdir dir="${build.webapps}/dfs/WEB-INF"/>
     <mkdir dir="${build.examples}"/>
+    <mkdir dir="${build.minimr}"/>
  
     <mkdir dir="${test.build.dir}"/>
     <mkdir dir="${test.build.classes}"/>
@@ -258,10 +262,13 @@
 
     <delete dir="${test.build.data}"/>
     <mkdir dir="${test.build.data}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
 
     <junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
       <sysproperty key="hadoop.log.dir" value="."/>
       <classpath refid="${test.classpath.id}"/>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jun 19 14:48:07 2006
@@ -58,10 +58,12 @@
     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
 
     private static JobTracker tracker = null;
+    private static boolean runTracker = true;
     public static void startTracker(Configuration conf) throws IOException {
       if (tracker != null)
         throw new IOException("JobTracker already running.");
-      while (true) {
+      runTracker = true;
+      while (runTracker) {
         try {
           tracker = new JobTracker(conf);
           break;
@@ -73,13 +75,21 @@
         } catch (InterruptedException e) {
         }
       }
-      tracker.offerService();
+      if (runTracker) { tracker.offerService(); }
     }
 
     public static JobTracker getTracker() {
         return tracker;
     }
 
+    public static void stopTracker() throws IOException {
+      if (tracker == null)
+        throw new IOException("Trying to stop JobTracker that is not running.");
+      runTracker = false;
+      tracker.close();
+      tracker = null;
+    }
+    
     /**
      * A thread to timeout tasks that have been assigned to task trackers,
      * but that haven't reported back yet.
@@ -353,8 +363,11 @@
     private TreeMap taskTrackers = new TreeMap();
     Vector jobInitQueue = new Vector();
     ExpireTrackers expireTrackers = new ExpireTrackers();
+    Thread expireTrackersThread = null;
     RetireJobs retireJobs = new RetireJobs();
+    Thread retireJobsThread = null;
     JobInitThread initJobs = new JobInitThread();
+    Thread initJobsThread = null;
     ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
     Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
     
@@ -439,9 +452,12 @@
 
         this.startTime = System.currentTimeMillis();
 
-        new Thread(this.expireTrackers).start();
-        new Thread(this.retireJobs).start();
-        new Thread(this.initJobs).start();
+        this.expireTrackersThread = new Thread(this.expireTrackers);
+        this.expireTrackersThread.start();
+        this.retireJobsThread = new Thread(this.retireJobs);
+        this.retireJobsThread.start();
+        this.initJobsThread = new Thread(this.initJobs);
+        this.initJobsThread.start();
         expireLaunchingTaskThread.start();
     }
 
@@ -466,8 +482,66 @@
             this.interTrackerServer.join();
         } catch (InterruptedException ie) {
         }
+        LOG.info("Stopped interTrackerServer");
     }
 
+    void close() throws IOException {
+        if (this.infoServer != null) {
+            LOG.info("Stopping infoServer");
+            try {
+                this.infoServer.stop();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        if (this.interTrackerServer != null) {
+            LOG.info("Stopping interTrackerServer");
+            this.interTrackerServer.stop();
+        }
+        if (this.expireTrackers != null) {
+            LOG.info("Stopping expireTrackers");
+            this.expireTrackers.stopTracker();
+            try {
+                this.expireTrackersThread.interrupt();
+                this.expireTrackersThread.join();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        if (this.retireJobs != null) {
+            LOG.info("Stopping retirer");
+            this.retireJobs.stopRetirer();
+            try {
+                this.retireJobsThread.interrupt();
+                this.retireJobsThread.join();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        if (this.initJobs != null) {
+            LOG.info("Stopping initer");
+            this.initJobs.stopIniter();
+            try {
+                this.initJobsThread.interrupt();
+                this.initJobsThread.join();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        if (this.expireLaunchingTaskThread != null) {
+            LOG.info("Stopping expireLaunchingTasks");
+            this.expireLaunchingTasks.stop();
+            try {
+                this.expireLaunchingTaskThread.interrupt();
+                this.expireLaunchingTaskThread.join();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        LOG.info("stopped all jobtracker services");
+        return;
+    }
+    
     ///////////////////////////////////////////////////////
     // Maintain lookup tables; called by JobInProgress
     // and TaskInProgress

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jun 19 14:48:07 2006
@@ -60,6 +60,10 @@
     Server mapOutputServer = null;
     InterTrackerProtocol jobClient;
 
+    StatusHttpServer server = null;
+    
+    boolean shuttingDown = false;
+    
     TreeMap tasks = null;
     TreeMap runningTasks = null;
     int mapTotal = 0;
@@ -145,8 +149,22 @@
         this.justStarted = true;
 
         this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
+        
+        this.running = true;
     }
 
+      public synchronized void shutdown() throws IOException {
+          shuttingDown = true;
+          close();
+          if (this.server != null) {
+            try {
+                LOG.info("Shttting down StatusHttpServer");
+                this.server.stop();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+          }
+      }
     /**
      * Close down the TaskTracker and all its components.  We must also shutdown
      * any running tasks or threads, and cleanup disk space.  A new TaskTracker
@@ -191,6 +209,8 @@
             mapOutputServer = null;
         }
 
+        this.running = false;
+        
         // Clear local storage
         this.mapOutputFile.cleanupStorage();
     }
@@ -206,7 +226,7 @@
       this.mapOutputFile = new MapOutputFile();
       this.mapOutputFile.setConf(conf);
       int httpPort = conf.getInt("tasktracker.http.port", 50060);
-      StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
+      this.server = new StatusHttpServer("task", httpPort, true);
       int workerThreads = conf.getInt("tasktracker.http.threads", 40);
       server.setThreads(1, workerThreads);
       server.start();
@@ -236,7 +256,7 @@
         long lastHeartbeat = 0;
         this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
 
-        while (running) {
+        while (running && !shuttingDown) {
             long now = System.currentTimeMillis();
 
             long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
@@ -407,26 +427,30 @@
      */
     public void run() {
         try {
-            while (running) {
+            while (running && !shuttingDown) {
                 boolean staleState = false;
                 try {
                     // This while-loop attempts reconnects if we get network errors
-                    while (running && ! staleState) {
+                    while (running && ! staleState && !shuttingDown ) {
                         try {
                             if (offerService() == STALE_STATE) {
                                 staleState = true;
                             }
                         } catch (Exception ex) {
-                            LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "].  Retrying...", ex);
-                            try {
-                                Thread.sleep(5000);
-                            } catch (InterruptedException ie) {
+                            if (!shuttingDown) {
+                                LOG.info("Lost connection to JobTracker [" +
+                                        jobTrackAddr + "].  Retrying...", ex);
+                                try {
+                                    Thread.sleep(5000);
+                                } catch (InterruptedException ie) {
+                                }
                             }
                         }
                     }
                 } finally {
                     close();
                 }
+                if (shuttingDown) { return; }
                 LOG.info("Reinitializing local state");
                 initialize();
             }
@@ -529,18 +553,20 @@
 
             localJobConf = new JobConf(localJobFile);
             localJobConf.set("mapred.task.id", task.getTaskId());
+            localJobConf.set("mapred.local.dir",
+                    this.defaultJobConf.get("mapred.local.dir"));
             String jarFile = localJobConf.getJar();
             if (jarFile != null) {
               fs.copyToLocalFile(new Path(jarFile), localJarFile);
               localJobConf.setJar(localJarFile.toString());
+            }
 
-              FileSystem localFs = FileSystem.getNamed("local", fConf);
-              OutputStream out = localFs.create(localJobFile);
-              try {
-                localJobConf.write(out);
-              } finally {
-                out.close();
-              }
+            FileSystem localFs = FileSystem.getNamed("local", fConf);
+            OutputStream out = localFs.create(localJobFile);
+            try {
+              localJobConf.write(out);
+            } finally {
+              out.close();
             }
             // set the task's configuration to the local job conf
             // rather than the default.
@@ -836,7 +862,7 @@
             
           Task task = umbilical.getTask(taskid);
           JobConf job = new JobConf(task.getJobFile());
-
+          
           defaultConf.addFinalResource(new Path(task.getJobFile()));
 
           startPinging(umbilical, taskid);        // start pinging parent

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class creates a single-process Map-Reduce cluster for junit testing.
+ * One thread is created for each server.
+ * @author Milind Bhandarkar
+ */
+public class MiniMRCluster {
+    
+    private Thread jobTrackerThread;
+    private JobTrackerRunner jobTracker;
+    private TaskTrackerRunner taskTracker;
+    
+    private int jobTrackerPort = 0;
+    private int taskTrackerPort = 0;
+    
+    private int numTaskTrackers;
+    
+    private ArrayList taskTrackerList = new ArrayList();
+    private ArrayList taskTrackerThreadList = new ArrayList();
+    
+    private String namenode;
+    
+    /**
+     * An inner class that runs a job tracker.
+     */
+    class JobTrackerRunner implements Runnable {
+        /**
+         * Create the job tracker and run it.
+         */
+        public void run() {
+            try {
+                JobConf jc = new JobConf();
+                jc.set("fs.name.node", namenode);
+                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+                // this timeout seems to control the minimum time for the test, so
+                // set it down at 2 seconds.
+                jc.setInt("ipc.client.timeout", 1000);
+                jc.set("mapred.local.dir","build/test/mapred/local");
+                JobTracker.startTracker(jc);
+            } catch (Throwable e) {
+                System.err.println("Job tracker crashed:");
+                e.printStackTrace();
+            }
+        }
+        
+        /**
+         * Shutdown the job tracker and wait for it to finish.
+         */
+        public void shutdown() {
+            try {
+                JobTracker.stopTracker();
+            } catch (Throwable e) {
+                System.err.println("Unable to shut down job tracker:");
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    /**
+     * An inner class to run the task tracker.
+     */
+    class TaskTrackerRunner implements Runnable {
+        TaskTracker tt;
+        
+        /**
+         * Create and run the task tracker.
+         */
+        public void run() {
+            try {
+                JobConf jc = new JobConf();
+                jc.set("fs.name.node", namenode);
+                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+                // this timeout seems to control the minimum time for the test, so
+                // set it down at 2 seconds.
+                jc.setInt("ipc.client.timeout", 1000);
+                jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
+                jc.setInt("mapred.task.tracker.output.port", taskTrackerPort++);
+                jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
+                File localDir = new File(jc.get("mapred.local.dir"));
+                File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
+                ttDir.mkdirs();
+                jc.set("mapred.local.dir", ttDir.getAbsolutePath());
+                tt = new TaskTracker(jc);
+                tt.run();
+            } catch (Throwable e) {
+                tt = null;
+                System.err.println("Task tracker crashed:");
+                e.printStackTrace();
+            }
+        }
+        
+        /**
+         * Shut down the server and wait for it to finish.
+         */
+        public void shutdown() {
+            if (tt != null) {
+                try {
+                    tt.shutdown();
+                } catch (Throwable e) {
+                    System.err.println("Unable to shut down task tracker:");
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+    
+    /**
+     * Create the config and start up the servers.
+     */
+    public MiniMRCluster(int jobTrackerPort,
+            int taskTrackerPort,
+            int numTaskTrackers,
+            String namenode) throws IOException {
+        this.jobTrackerPort = jobTrackerPort;
+        this.taskTrackerPort = taskTrackerPort;
+        this.numTaskTrackers = numTaskTrackers;
+        this.namenode = namenode;
+        
+        File configDir = new File("build", "minimr");
+        configDir.mkdirs();
+        File siteFile = new File(configDir, "hadoop-site.xml");
+        PrintWriter pw = new PrintWriter(siteFile);
+        pw.print("<?xml version=\"1.0\"?>\n"+
+                "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
+                "<configuration>\n"+
+                " <property>\n"+
+                "   <name>mapred.system.dir</name>\n"+
+                "   <value>build/test/mapred/system</value>\n"+
+                " </property>\n"+
+                "</configuration>\n");
+        pw.close();
+        jobTracker = new JobTrackerRunner();
+        jobTrackerThread = new Thread(jobTracker);
+        jobTrackerThread.start();
+        try {                                     // let jobTracker get started
+            Thread.sleep(2000);
+        } catch(InterruptedException e) {
+        }
+        for (int idx = 0; idx < numTaskTrackers; idx++) {
+            TaskTrackerRunner taskTracker = new TaskTrackerRunner();
+            Thread taskTrackerThread = new Thread(taskTracker);
+            taskTrackerThread.start();
+            taskTrackerList.add(taskTracker);
+            taskTrackerThreadList.add(taskTrackerThread);
+        }
+        try {                                     // let taskTrackers get started
+            Thread.sleep(2000);
+        } catch(InterruptedException e) {
+        }
+    }
+    
+    /**
+     * Shut down the servers.
+     */
+    public void shutdown() {
+        try {
+        for (int idx = 0; idx < numTaskTrackers; idx++) {
+            TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);
+            Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
+            taskTracker.shutdown();
+            taskTrackerThread.interrupt();
+            try {
+                taskTrackerThread.join();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+        }
+        jobTracker.shutdown();
+        jobTrackerThread.interrupt();
+        try {
+            jobTrackerThread.join();
+        } catch (InterruptedException ex) {
+            ex.printStackTrace();
+        }
+        } finally {
+            File configDir = new File("build", "minimr");
+            File siteFile = new File(configDir, "hadoop-site.xml");
+            siteFile.delete();
+        }
+    }
+    
+    public static void main(String[] args) throws IOException {
+        System.out.println("Bringing up Jobtracker and tasktrackers.");
+        MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local");
+        System.out.println("JobTracker and TaskTrackers are up.");
+        mr.shutdown();
+        System.out.println("JobTracker and TaskTrackers brought down.");
+    }
+}
+

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,212 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
+ * method.
+ *
+ * @author Milind Bhandarkar
+ */
+public class PiEstimator {
+  
+  /**
+   * Mappper class for Pi estimation.
+   */
+  
+  public static class PiMapper extends MapReduceBase implements Mapper {
+    
+    /** Mapper configuration.
+     *
+     */
+    public void configure(JobConf job) {
+    }
+    
+    static Random r = new Random();
+    
+    /** Map method.
+     * @param key
+     * @param value not-used.
+     * @param out
+     * @param reporter
+     */
+    public void map(WritableComparable key,
+        Writable val,
+        OutputCollector out,
+        Reporter reporter) throws IOException {
+        int nSamples = ((IntWritable) key).get();
+        for(int idx = 0; idx < nSamples; idx++) {
+            double x = r.nextDouble();
+            double y = r.nextDouble();
+            double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
+            if (d > 0.25) {
+                out.collect(new IntWritable(0), new IntWritable(1));
+            } else {
+                out.collect(new IntWritable(1), new IntWritable(1));
+            }
+            if (idx%100 == 1) {
+                reporter.setStatus("Generated "+idx+" samples.");
+            }
+        }
+    }
+    
+    public void close() {
+      // nothing
+    }
+  }
+  
+  public static class PiReducer extends MapReduceBase implements Reducer {
+      int numInside = 0;
+      int numOutside = 0;
+      JobConf conf;
+      
+      /** Reducer configuration.
+       *
+       */
+      public void configure(JobConf job) {
+          conf = job;
+      }
+      /** Reduce method.
+       * @ param key
+       * @param values
+       * @param output
+       * @param reporter
+       */
+      public void reduce(WritableComparable key,
+              Iterator values,
+              OutputCollector output,
+              Reporter reporter) throws IOException {
+          if (((IntWritable)key).get() == 1) {
+              while (values.hasNext()) {
+                  int num = ((IntWritable)values.next()).get();
+                  numInside += num;
+              }
+          } else {
+              while (values.hasNext()) {
+                  int num = ((IntWritable)values.next()).get();
+                  numOutside += num;
+              }
+          }
+      }
+      
+      public void close() throws IOException {
+        Path tmpDir = new Path("test-mini-mr");
+        Path outDir = new Path(tmpDir, "out");
+        Path outFile = new Path(outDir, "reduce-out");
+        FileSystem fileSys = FileSystem.get(conf);
+        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
+              IntWritable.class, IntWritable.class);
+        writer.append(new IntWritable(numInside), new IntWritable(numOutside));
+        writer.close();
+      }
+  }
+
+  /**
+   * This is the main driver for computing the value of Pi using
+   * monte-carlo method.
+   */
+  static double launch(int numMaps, int numPoints, String jt, String dfs)
+  throws IOException {
+
+    Configuration conf = new Configuration();
+    JobConf jobConf = new JobConf(conf, PiEstimator.class);
+    if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
+    if (dfs != null) { jobConf.set("fs.default.name", dfs); }
+    jobConf.setJobName("test-mini-mr");
+    
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setInputKeyClass(IntWritable.class);
+    jobConf.setInputValueClass(IntWritable.class);
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+        
+    jobConf.setOutputKeyClass(IntWritable.class);
+    jobConf.setOutputValueClass(IntWritable.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    jobConf.setMapperClass(PiMapper.class);
+    jobConf.setReducerClass(PiReducer.class);
+    
+    jobConf.setNumReduceTasks(1);
+
+    Path tmpDir = new Path("test-mini-mr");
+    Path inDir = new Path(tmpDir, "in");
+    Path outDir = new Path(tmpDir, "out");
+    FileSystem fileSys = FileSystem.get(jobConf);
+    fileSys.delete(tmpDir);
+    fileSys.mkdirs(inDir);
+    
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(outDir);
+    
+    jobConf.setNumMapTasks(numMaps);
+    
+    for(int idx=0; idx < numMaps; ++idx) {
+      Path file = new Path(inDir, "part"+idx);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
+              IntWritable.class, IntWritable.class);
+      writer.append(new IntWritable(numPoints), new IntWritable(0));
+      writer.close();
+    }
+    
+    double estimate = 0.0;
+    
+    try {
+      JobClient.runJob(jobConf);
+      Path inFile = new Path(outDir, "reduce-out");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
+              jobConf);
+      IntWritable numInside = new IntWritable();
+      IntWritable numOutside = new IntWritable();
+      reader.next(numInside, numOutside);
+      reader.close();
+      estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
+    } finally {
+      fileSys.delete(tmpDir);
+    }
+    
+    return estimate;
+  }
+  
+  /**
+     * Launches all the tasks in order.
+     */
+    public static void main(String[] argv) throws Exception {
+        if (argv.length < 2) {
+            System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+            return;
+        }
+
+        int nMaps = Integer.parseInt(argv[0]);
+        int nSamples = Integer.parseInt(argv[1]);
+        
+	System.out.println("Estimated value of PI is "+
+                launch(nMaps, nSamples, null, null));
+    }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+
+/**
+ * A Unit-test to test bringup and shutdown of Mini Map-Reduce Cluster.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRBringup extends TestCase {
+
+    public void testBringUp() throws IOException {
+      MiniMRCluster mr = null;
+      try {
+          mr = new MiniMRCluster(50000, 50010, 1, "local");
+      } finally {
+          if (mr != null) { mr.shutdown(); }
+      }
+  }
+  
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+
+/**
+ * A JUnit test to test min map-reduce cluster with local file system.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRLocalFS extends TestCase {
+  
+    static final int NUM_MAPS = 10;
+    static final int NUM_SAMPLES = 100000;
+    
+  public void testWithLocal() throws IOException {
+      MiniMRCluster mr = null;
+      try {
+          mr = new MiniMRCluster(60030, 60040, 2, "local");
+          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
+          double error = Math.abs(Math.PI - estimate);
+          assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+      } finally {
+          if (mr != null) { mr.shutdown(); }
+      }
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRWithDFS extends TestCase {
+  
+    static final int NUM_MAPS = 10;
+    static final int NUM_SAMPLES = 100000;
+    
+  public void testWithDFS() throws IOException {
+      String namenode = null;
+      MiniDFSCluster dfs = null;
+      MiniMRCluster mr = null;
+      FileSystem fileSys = null;
+      try {
+          Configuration conf = new Configuration();
+          dfs = new MiniDFSCluster(65314, conf);
+          fileSys = dfs.getFileSystem();
+          namenode = fileSys.getName();
+          mr = new MiniMRCluster(50050, 50060, 4, namenode);
+          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode);
+          double error = Math.abs(Math.PI - estimate);
+          assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+      } finally {
+          if (fileSys != null) { fileSys.close(); }
+          if (dfs != null) { dfs.shutdown(); }
+          if (mr != null) { mr.shutdown();
+          }
+      }
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Mon Jun 19 14:48:07 2006
@@ -6,10 +6,10 @@
   import="javax.servlet.http.*"
   import="java.io.*"
   import="java.util.*"
-  import="java.util.logging.Logger"
   import="org.apache.hadoop.fs.*"
   import="org.apache.hadoop.mapred.*"
   import="org.apache.hadoop.util.*"
+  import="org.apache.commons.logging.*"
 %><%
   String mapId = request.getParameter("map");
   String reduceId = request.getParameter("reduce");
@@ -35,8 +35,8 @@
   } catch (IOException ie) {
     TaskTracker tracker = 
        (TaskTracker) application.getAttribute("task.tracker");
-    Logger log = (Logger) application.getAttribute("log");
-    log.warning("Http server (getMapOutput.jsp): " +
+    Log log = (Log) application.getAttribute("log");
+    log.warn("Http server (getMapOutput.jsp): " +
                 StringUtils.stringifyException(ie));
     tracker.mapOutputLost(mapId);
     throw ie;