You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2006/02/02 18:58:44 UTC

svn commit: r374443 [2/2] - in /lucene/nutch/trunk/src: java/org/apache/nutch/fs/ java/org/apache/nutch/mapred/ test/org/apache/nutch/mapred/ webapps/jobtracker/

Added: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java?rev=374443&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java Thu Feb  2 09:58:38 2006
@@ -0,0 +1,445 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.mapred;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.ipc.*;
+import org.apache.nutch.util.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.logging.*;
+
+
+////////////////////////////////////////////////////////
+// TaskInProgress maintains all the info needed for a
+// Task in the lifetime of its owning Job.  A given Task
+// might be speculatively executed or reexecuted, so we
+// need a level of indirection above the running-id itself.
+//
+// A given TaskInProgress contains multiple taskids,
+// 0 or more of which might be executing at any one time.
+// (That's what allows speculative execution.)  A taskid
+// is now *never* recycled.  A TIP allocates enough taskids
+// to account for all the speculation and failures it will
+// ever have to handle.  Once those are up, the TIP is dead.
+//
+////////////////////////////////////////////////////////
+class TaskInProgress {
+    static final int MAX_TASK_EXECS = 10;
+    static final int MAX_TASK_FAILURES = 4;    
+    static final double SPECULATIVE_GAP = 0.2;
+    static final long SPECULATIVE_LAG = 60 * 1000;
+
+    public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskInProgress");
+
+    // Defines the TIP
+    String jobFile = null;
+    FileSplit split = null;
+    TaskInProgress predecessors[] = null;
+    int partition;
+    JobTracker jobtracker;
+    String id;
+    String totalTaskIds[];
+    JobInProgress job;
+
+    // Status of the TIP
+    int numTaskFailures = 0;
+    double progress = 0;
+    long startTime = 0;
+    int completes = 0;
+    boolean failed = false;
+    TreeSet usableTaskIds = new TreeSet();
+    TreeSet recentTasks = new TreeSet();
+    NutchConf nutchConf;
+    
+    TreeMap taskDiagnosticData = new TreeMap();
+    TreeMap taskStatuses = new TreeMap();
+
+    TreeSet machinesWhereFailed = new TreeSet();
+    TreeSet tasksReportedClosed = new TreeSet();
+
+    /**
+     * Constructor for MapTask
+     */
+    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) {
+        this.jobFile = jobFile;
+        this.split = split;
+        this.jobtracker = jobtracker;
+        this.job = job;
+        this.nutchConf = nutchConf;
+        init();
+    }
+        
+    /**
+     * Constructor for ReduceTask
+     */
+    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) {
+        this.jobFile = jobFile;
+        this.predecessors = predecessors;
+        this.partition = partition;
+        this.jobtracker = jobtracker;
+        this.job = job;
+        this.nutchConf = nutchConf;
+        init();
+    }
+
+    /**
+     * Initialization common to Map and Reduce
+     */
+    void init() {
+        this.startTime = System.currentTimeMillis();
+        this.id = "tip_" + jobtracker.createUniqueId();
+        this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
+        for (int i = 0; i < totalTaskIds.length; i++) {
+            if (isMapTask()) {
+                totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId();
+            } else {
+                totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId();
+            }
+            usableTaskIds.add(totalTaskIds[i]);
+        }
+    }
+
+    ////////////////////////////////////
+    // Accessors, info, profiles, etc.
+    ////////////////////////////////////
+
+    /**
+     * Return the parent job
+     */
+    public JobInProgress getJob() {
+        return job;
+    }
+    /**
+     * Return an ID for this task, not its component taskid-threads
+     */
+    public String getTIPId() {
+        return this.id;
+    }
+    /**
+     * Whether this is a map task
+     */
+    public boolean isMapTask() {
+        return split != null;
+    }
+    /**
+     */
+    public boolean isComplete() {
+        return (completes > 0);
+    }
+    /**
+     */
+    public boolean isComplete(String taskid) {
+        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+        if (status == null) {
+            return false;
+        }
+        return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED));
+    }
+    /**
+     */
+    public boolean isFailed() {
+        return failed;
+    }
+    /**
+     * Number of times the TaskInProgress has failed.
+     */
+    public int numTaskFailures() {
+        return numTaskFailures();
+    }
+    /**
+     * Get the overall progress (from 0 to 1.0) for this TIP
+     */
+    public double getProgress() {
+        return progress;
+    }
+    /**
+     * Returns whether a component task-thread should be 
+     * closed because the containing JobInProgress has completed.
+     */
+    public boolean shouldCloseForClosedJob(String taskid) {
+        // If the thing has never been closed,
+        // and it belongs to this TIP,
+        // and this TIP is somehow FINISHED,
+        // then true
+        TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+        if ((ts != null) &&
+            (! tasksReportedClosed.contains(taskid)) &&
+            (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+            tasksReportedClosed.add(taskid);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * A TaskInProgress might be speculatively executed, and so
+     * can have many taskids simultaneously.  Reduce tasks rely on knowing
+     * their predecessor ids, so they can be sure that all the previous
+     * work has been completed.
+     *
+     * But we don't know ahead of time which task id will actually be
+     * the one that completes for a given Map task.  We don't want the
+     * Reduce task to have to be recreated after Map-completion, or check
+     * in with the JobTracker.  So instead, each TaskInProgress preallocates
+     * all the task-ids it could ever want to run simultaneously.  Then the
+     * Reduce task can be told about all the ids task-ids for a given Map 
+     * TaskInProgress.  If any of the Map TIP's tasks complete, the Reduce
+     * task will know all is well, and can continue.
+     *
+     * Most of the time, only a small number of the possible task-ids will
+     * ever be used.
+     */
+    public String[] getAllPossibleTaskIds() {
+        return totalTaskIds;
+    }
+
+    /**
+     * Creates a "status report" for this task.  Includes the
+     * task ID and overall status, plus reports for all the
+     * component task-threads that have ever been started.
+     */
+    Vector generateSingleReport() {
+        Vector report = new Vector();
+        report.add(getTIPId());
+        report.add("" + progress);
+
+        report.add(new Integer(taskDiagnosticData.size()));
+        for (Iterator it = taskDiagnosticData.keySet().iterator(); it.hasNext(); ) {
+            String taskid = (String) it.next();
+            Vector taskData = (Vector) taskDiagnosticData.get(taskid);
+
+            TaskStatus taskStatus = (TaskStatus) taskStatuses.get(taskid);
+            String taskStateString = taskStatus.getStateString();
+
+            report.add(taskData);
+            report.add(taskStateString);
+        }
+        return report;
+    }
+
+    ////////////////////////////////////////////////
+    // Update methods, usually invoked by the owning
+    // job.
+    ////////////////////////////////////////////////
+    /**
+     * A status message from a client has arrived.
+     * It updates the status of a single component-thread-task,
+     * which might result in an overall TaskInProgress status update.
+     */
+    public void updateStatus(TaskStatus status) {
+        String taskid = status.getTaskId();
+        String diagInfo = status.getDiagnosticInfo();
+        if (diagInfo != null && diagInfo.length() > 0) {
+            Vector diagHistory = (Vector) taskDiagnosticData.get(taskid);
+            if (diagHistory == null) {
+                diagHistory = new Vector();
+                taskDiagnosticData.put(taskid, diagHistory);
+            }
+            diagHistory.add(diagInfo);
+        }
+        taskStatuses.put(taskid, status);
+
+        // Recompute progress
+        recomputeProgress();
+    }
+
+    /**
+     * Indicate that one of the taskids in this TaskInProgress
+     * has failed.
+     */
+    public void failedSubTask(String taskid, String trackerName) {
+        //
+        // Note the failure and its location
+        //
+        LOG.info("Task '" + taskid + "' has been lost.");
+        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+        if (status != null) {
+            status.setRunState(TaskStatus.FAILED);
+        }
+        this.recentTasks.remove(taskid);
+        this.completes--;
+
+        numTaskFailures++;
+        if (numTaskFailures >= MAX_TASK_FAILURES) {
+            LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
+            kill();
+        }
+        machinesWhereFailed.add(trackerName);
+
+        // Ask JobTracker to forget about this task
+        jobtracker.removeTaskEntry(taskid);
+
+        recomputeProgress();
+    }
+
+    /**
+     * Indicate that one of the taskids in this TaskInProgress
+     * has successfully completed!
+     */
+    public void completed(String taskid) {
+        LOG.info("Task '" + taskid + "' has completed.");
+        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+        status.setRunState(TaskStatus.SUCCEEDED);
+        recentTasks.remove(taskid);
+
+        //
+        // Now that the TIP is complete, the other speculative 
+        // subtasks will be closed when the owning tasktracker 
+        // reports in and calls shouldClose() on this object.
+        //
+
+        this.completes++;
+        recomputeProgress();
+    }
+
+    /**
+     * The TIP's been ordered kill()ed.
+     */
+    public void kill() {
+        if (isComplete() || failed) {
+            return;
+        }
+        this.failed = true;
+        recomputeProgress();
+    }
+
+    /**
+     * This method is called whenever there's a status change
+     * for one of the TIP's sub-tasks.  It recomputes the overall 
+     * progress for the TIP.  We examine all sub-tasks and find 
+     * the one that's most advanced (and non-failed).
+     */
+    void recomputeProgress() {
+        if (isComplete()) {
+            this.progress = 1;
+        } else if (failed) {
+            this.progress = 0;
+        } else {
+            double bestProgress = 0;
+            for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {
+                String taskid = (String) it.next();
+                TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+                if (status.getRunState() == TaskStatus.SUCCEEDED) {
+                    bestProgress = 1;
+                    break;
+                } else if (status.getRunState() == TaskStatus.RUNNING) {
+                    bestProgress = Math.max(bestProgress, status.getProgress());
+                }
+            }
+            this.progress = bestProgress;
+        }
+    }
+
+    /////////////////////////////////////////////////
+    // "Action" methods that actually require the TIP
+    // to do something.
+    /////////////////////////////////////////////////
+    /**
+     * Return whether this TIP has an NDFS cache-driven task 
+     * to run at the given taskTracker.
+     */
+    boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) {
+        if (failed || isComplete() || recentTasks.size() > 0) {
+            return false;
+        } else {
+            try {
+                if (isMapTask()) {
+                    NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+                    String hints[][] = fs.getFileCacheHints(split.getFile(), split.getStart(), split.getLength());
+                    for (int i = 0; i < hints.length; i++) {
+                        for (int j = 0; j < hints[i].length; j++) {
+                            if (hints[i][j].equals(tts.getHost())) {
+                                return true;
+                            }
+                        }
+                    }
+                }
+            } catch (IOException ie) {
+            }
+            return false;
+        }
+    }
+    /**
+     * Return whether this TIP has a non-speculative task to run
+     */
+    boolean hasTask() {
+        if (failed || isComplete() || recentTasks.size() > 0) {
+            return false;
+        } else {
+            for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) {
+                TaskStatus ts = (TaskStatus) it.next();
+                if (ts.getRunState() == TaskStatus.RUNNING) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+    /**
+     * Return whether the TIP has a speculative task to run.  We
+     * only launch a speculative task if the current TIP is really
+     * far behind, and has been behind for a non-trivial amount of 
+     * time.
+     */
+    boolean hasSpeculativeTask(double averageProgress) {
+        //
+        // REMIND - mjc - these constants should be examined
+        // in more depth eventually...
+        //
+        if (isMapTask() && 
+            (averageProgress - progress >= SPECULATIVE_GAP) &&
+            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
+            return true;
+        }
+        return false;
+    }
+    
+    /**
+     * Return a Task that can be sent to a TaskTracker for execution.
+     */
+    public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) {
+        Task t = null;
+        if (hasTaskWithCacheHit(taskTracker, tts) ||
+            hasTask() || 
+            hasSpeculativeTask(avgProgress)) {
+
+            String taskid = (String) usableTaskIds.first();
+            usableTaskIds.remove(taskid);
+
+            if (isMapTask()) {
+                t = new MapTask(jobFile, taskid, split);
+            } else {
+                String mapIdPredecessors[][] = new String[predecessors.length][];
+                for (int i = 0; i < mapIdPredecessors.length; i++) {
+                    mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds();
+                }
+                t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition);
+            }
+            t.setConf(nutchConf);
+
+            recentTasks.add(taskid);
+
+            // Ask JobTracker to note that the task exists
+            jobtracker.createTaskEntry(taskid, taskTracker, this);
+        }
+        return t;
+    }
+}

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Feb  2 09:58:38 2006
@@ -33,6 +33,7 @@
   public static final Logger LOG =
     LogFormatter.getLogger("org.apache.nutch.mapred.TaskRunner");
 
+  boolean killed = false;
   private Process process;
   private Task t;
   private TaskTracker tracker;
@@ -51,7 +52,7 @@
   /** Called to assemble this task's input.  This method is run in the parent
    * process before the child is spawned.  It should not execute user code,
    * only system code. */
-  public void prepare() throws IOException {}
+  public boolean prepare() throws IOException {return true;}
 
   /** Called when this task's output is no longer needed.
   * This method is run in the parent process after the child exits.  It should
@@ -62,7 +63,9 @@
   public final void run() {
     try {
 
-      prepare();
+      if (! prepare()) {
+        return;
+      }
 
       String sep = System.getProperty("path.separator");
       File workDir = new File(new File(t.getJobFile()).getParent(), "work");
@@ -72,7 +75,7 @@
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
-      
+
       JobConf job = new JobConf(t.getJobFile());
       String jar = job.getJar();
       if (jar != null) {                      // if jar exists, it into workDir
@@ -158,6 +161,7 @@
       if (process != null) {
           process.destroy();
       }
+      killed = true;
   }
 
   /**

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java Thu Feb  2 09:58:38 2006
@@ -34,6 +34,7 @@
     public static final int UNASSIGNED = 3;
     
     private String taskid;
+    private boolean isMap;
     private float progress;
     private int runState;
     private String diagnosticInfo;
@@ -41,8 +42,9 @@
 
     public TaskStatus() {}
 
-    public TaskStatus(String taskid, float progress, int runState, String diagnosticInfo, String stateString) {
+    public TaskStatus(String taskid, boolean isMap, float progress, int runState, String diagnosticInfo, String stateString) {
         this.taskid = taskid;
+        this.isMap = isMap;
         this.progress = progress;
         this.runState = runState;
         this.diagnosticInfo = diagnosticInfo;
@@ -50,6 +52,7 @@
     }
     
     public String getTaskId() { return taskid; }
+    public boolean getIsMap() { return isMap; }
     public float getProgress() { return progress; }
     public void setProgress(float progress) { this.progress = progress; } 
     public int getRunState() { return runState; }
@@ -64,6 +67,7 @@
     //////////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
         UTF8.writeString(out, taskid);
+        out.writeBoolean(isMap);
         out.writeFloat(progress);
         out.writeInt(runState);
         UTF8.writeString(out, diagnosticInfo);
@@ -72,6 +76,7 @@
 
     public void readFields(DataInput in) throws IOException {
         this.taskid = UTF8.readString(in);
+        this.isMap = in.readBoolean();
         this.progress = in.readFloat();
         this.runState = in.readInt();
         this.diagnosticInfo = UTF8.readString(in);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Thu Feb  2 09:58:38 2006
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nutch.mapred;
+ package org.apache.nutch.mapred;
 
 import org.apache.nutch.fs.*;
 import org.apache.nutch.io.*;
@@ -33,7 +33,6 @@
  * @author Mike Cafarella
  *******************************************************/
 public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
-    private int maxCurrentTask; 
     static final long WAIT_FOR_DONE = 3 * 1000;
     private long taskTimeout; 
 
@@ -66,6 +65,8 @@
     private NutchConf fConf;
     private MapOutputFile mapOutputFile;
 
+    private int maxCurrentTasks;
+
     /**
      * Start with the local machine name, and the default JobTracker
      */
@@ -77,9 +78,10 @@
      * Start with the local machine name, and the addr of the target JobTracker
      */
     public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException {
+        maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+
         this.fConf = conf;
         this.jobTrackAddr = jobTrackAddr;
-        this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
         this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
         this.mapOutputFile = new MapOutputFile();
         this.mapOutputFile.setConf(conf);
@@ -93,6 +95,7 @@
      */
     void initialize() throws IOException {
         this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
+        LOG.info("Starting tracker " + taskTrackerName);
         this.localHostname = InetAddress.getLocalHost().getHostName();
 
         new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
@@ -108,7 +111,7 @@
         // RPC initialization
         while (true) {
             try {
-                this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf);
+                this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
                 this.taskReportServer.start();
                 break;
             } catch (BindException e) {
@@ -119,7 +122,7 @@
         }
         while (true) {
             try {
-                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf);
+                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf);
                 this.mapOutputServer.start();
                 break;
             } catch (BindException e) {
@@ -142,9 +145,14 @@
      * clean.
      */
     public synchronized void close() throws IOException {
-        // Kill running tasks
-        while (tasks.size() > 0) {
-            TaskInProgress tip = (TaskInProgress)tasks.get(tasks.firstKey());
+        //
+        // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
+        // because calling jobHasFinished() may result in an edit to 'tasks'.
+        //
+        TreeMap tasksToClose = new TreeMap();
+        tasksToClose.putAll(tasks);
+        for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) {
+            TaskInProgress tip = (TaskInProgress) it.next();
             tip.jobHasFinished();
         }
 
@@ -154,11 +162,21 @@
         } catch (InterruptedException ie) {
         }
 
-        // Shutdown local RPC servers
-        if (taskReportServer != null) {
-            taskReportServer.stop();
-            taskReportServer = null;
-        }
+        //
+        // Shutdown local RPC servers.  Do them
+        // in parallel, as RPC servers can take a long
+        // time to shutdown.  (They need to wait a full
+        // RPC timeout, which might be 10-30 seconds.)
+        //
+        new Thread() {
+            public void run() {
+                if (taskReportServer != null) {
+                    taskReportServer.stop();
+                    taskReportServer = null;
+                }
+            }
+        }.start();
+
         if (mapOutputServer != null) {
             mapOutputServer.stop();
             mapOutputServer = null;
@@ -227,7 +245,7 @@
             //
             // Check if we should create a new Task
             //
-            if (runningTasks.size() < this.maxCurrentTask) {
+            if (runningTasks.size() < maxCurrentTasks) {
                 Task t = jobClient.pollForNewTask(taskTrackerName);
                 if (t != null) {
                     TaskInProgress tip = new TaskInProgress(t, this.fConf);
@@ -255,9 +273,16 @@
             }
 
             //
+            // Check for any Tasks that should be killed, even if
+            // the containing Job is still ongoing.  (This happens
+            // with speculative execution, when one version of the
+            // task finished before another
+            //
+
+            //
             // Check for any Tasks whose job may have ended
             //
-            String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
+            String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName);
             if (toCloseId != null) {
               synchronized (this) {
                 TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
@@ -288,7 +313,8 @@
                                 staleState = true;
                             }
                         } catch (Exception ex) {
-                            LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. ex=" + ex + "  Retrying...");
+                            ex.printStackTrace();
+                            LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "].  Retrying...");
                             try {
                                 Thread.sleep(5000);
                             } catch (InterruptedException ie) {
@@ -373,7 +399,7 @@
         /**
          */
         public TaskStatus createStatus() {
-            TaskStatus status = new TaskStatus(task.getTaskId(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString);
+            TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString);
             if (diagnosticInfo.length() > 0) {
                 diagnosticInfo = new StringBuffer();
             }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java Thu Feb  2 09:58:38 2006
@@ -77,6 +77,28 @@
     public Iterator taskReports() {
         return taskReports.iterator();
     }
+
+    /**
+     * Return the current MapTask count
+     */
+    public int countMapTasks() {
+        int mapCount = 0;
+        for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+            TaskStatus ts = (TaskStatus) it.next();
+            if (ts.getIsMap()) {
+                mapCount++;
+            }
+        }
+        return mapCount;
+    }
+
+    /**
+     * Return the current ReduceTask count
+     */
+    public int countReduceTasks() {
+        return taskReports.size() - countMapTasks();
+    }
+
     /**
      */
     public long getLastSeen() {

Added: lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java?rev=374443&view=auto
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java (added)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java Thu Feb  2 09:58:38 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.mapred;
+
+import org.apache.nutch.fs.*;
+import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.lib.*;
+
+import java.io.*;
+import java.util.*;
+import java.math.*;
+
+/**********************************************************
+ * MapredLoadTest generates a bunch of work that exercises
+ * a Nutch Map-Reduce system (and NDFS, too).  It goes through
+ * the following steps:
+ *
+ * 1) Take inputs 'range' and 'counts'.
+ * 2) Generate 'counts' random integers between 0 and range-1.
+ * 3) Create a file that lists each integer between 0 and range-1,
+ *    and lists the number of times that integer was generated.
+ * 4) Emit a (very large) file that contains all the integers
+ *    in the order generated.
+ * 5) After the file has been generated, read it back and count
+ *    how many times each int was generated.
+ * 6) Compare this big count-map against the original one.  If
+ *    they match, then SUCCESS!  Otherwise, FAILURE!
+ *
+ * OK, that's how we can think about it.  What are the map-reduce
+ * steps that get the job done?
+ *
+ * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
+ * 2) In a non-mapread thread, generate the answer-key and write to disk.
+ * 3) In a mapred job, divide the answer key into K jobs.
+ * 4) A mapred 'generator' task consists of K map jobs.  Each reads
+ *    an individual "sub-key", and generates integers according to
+ *    to it (though with a random ordering).
+ * 5) The generator's reduce task agglomerates all of those files
+ *    into a single one.
+ * 6) A mapred 'reader' task consists of M map jobs.  The output
+ *    file is cut into M pieces. Each of the M jobs counts the 
+ *    individual ints in its chunk and creates a map of all seen ints.
+ * 7) A mapred job integrates all the count files into a single one.
+ *
+ **********************************************************/
+public class MapredLoadTest {
+    static class RandomGenMapper implements Mapper {
+        Random r = new Random();
+        public void configure(JobConf job) {
+        }
+
+        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+            int randomVal = ((IntWritable) key).get();
+            int randomCount = ((IntWritable) val).get();
+
+            for (int i = 0; i < randomCount; i++) {
+                out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
+            }
+        }
+    }
+    static class RandomGenReducer implements Reducer {
+        public void configure(JobConf job) {
+        }
+
+        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            while (it.hasNext()) {
+                int val = ((IntWritable) it.next()).get();
+                out.collect(new UTF8("" + val), new UTF8(""));
+            }
+        }
+    }
+    static class RandomCheckMapper implements Mapper {
+        public void configure(JobConf job) {
+        }
+
+        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+            long pos = ((LongWritable) key).get();
+            UTF8 str = (UTF8) val;
+
+            out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+        }
+    }
+    static class RandomCheckReducer implements Reducer {
+        public void configure(JobConf job) {
+        }
+        
+        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            int count = 0;
+            while (it.hasNext()) {
+                it.next();
+                count++;
+            }
+            out.collect(new IntWritable(keyint), new IntWritable(count));
+        }
+    }
+
+    int range;
+    int counts;
+    Random r = new Random();
+    NutchConf nutchConf;
+
+    /**
+     * MapredLoadTest
+     */
+    public MapredLoadTest(int range, int counts, NutchConf nutchConf) throws IOException {
+        this.range = range;
+        this.counts = counts;
+        this.nutchConf = nutchConf;
+    }
+
+    /**
+     * 
+     */
+    public void launch() throws IOException {
+        //
+        // Generate distribution of ints.  This is the answer key.
+        //
+        int countsToGo = counts;
+        int dist[] = new int[range];
+        for (int i = 0; i < range; i++) {
+            double avgInts = (1.0 * countsToGo) / (range - i);
+            dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
+            countsToGo -= dist[i];
+        }
+        if (countsToGo > 0) {
+            dist[dist.length-1] += countsToGo;
+        }
+
+        //
+        // Write the answer key to a file.  
+        //
+        NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+        File testdir = new File("mapred.loadtest");
+        fs.mkdirs(testdir);
+
+        File randomIns = new File(testdir, "genins");
+        fs.mkdirs(randomIns);
+
+        File answerkey = new File(randomIns, "answer.key");
+        SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class);
+        try {
+            for (int i = 0; i < range; i++) {
+                out.append(new IntWritable(i), new IntWritable(dist[i]));
+            }
+        } finally {
+            out.close();
+        }
+
+        //
+        // Now we need to generate the random numbers according to
+        // the above distribution.
+        //
+        // We create a lot of map tasks, each of which takes at least
+        // one "line" of the distribution.  (That is, a certain number
+        // X is to be generated Y number of times.)
+        //
+        // A map task emits Y key/val pairs.  The val is X.  The key
+        // is a randomly-generated number.
+        //
+        // The reduce task gets its input sorted by key.  That is, sorted
+        // in random order.  It then emits a single line of text that
+        // for the given values.  It does not emit the key.
+        //
+        // Because there's just one reduce task, we emit a single big
+        // file of random numbers.
+        //
+        File randomOuts = new File(testdir, "genouts");
+        fs.mkdirs(randomOuts);
+
+
+        JobConf genJob = new JobConf(nutchConf);
+        genJob.setInputDir(randomIns);
+        genJob.setInputKeyClass(IntWritable.class);
+        genJob.setInputValueClass(IntWritable.class);
+        genJob.setInputFormat(SequenceFileInputFormat.class);
+        genJob.setMapperClass(RandomGenMapper.class);
+
+        genJob.setOutputDir(randomOuts);
+        genJob.setOutputKeyClass(IntWritable.class);
+        genJob.setOutputValueClass(IntWritable.class);
+        genJob.setOutputFormat(TextOutputFormat.class);
+        genJob.setReducerClass(RandomGenReducer.class);
+        genJob.setNumReduceTasks(1);
+
+        JobClient.runJob(genJob);
+
+        //
+        // Next, we read the big file in and regenerate the 
+        // original map.
+        //
+        // We have many map tasks, each of which read at least one
+        // of the output numbers.  For each number read in, the
+        // map task emits a key/value pair where the key is the
+        // number and the value is "1".
+        //
+        // We have a single reduce task, which receives its input
+        // sorted by the key emitted above.  For each key, there will
+        // be a certain number of "1" values.  The reduce task sums
+        // these values to compute how many times the given key was
+        // emitted.
+        //
+        // The reduce task then emits a key/val pair where the key
+        // is the number in question, and the value is the number of
+        // times the key was emitted.  This is the same format as the
+        // original answer key (except that numbers emitted zero times
+        // will not appear in the regenerated key.)
+        //
+        File finalOuts = new File(testdir, "finalouts");
+        fs.mkdirs(finalOuts);
+        JobConf checkJob = new JobConf(nutchConf);
+        checkJob.setInputDir(randomOuts);
+        checkJob.setInputKeyClass(LongWritable.class);
+        checkJob.setInputValueClass(UTF8.class);
+        checkJob.setInputFormat(TextInputFormat.class);
+        checkJob.setMapperClass(RandomCheckMapper.class);
+
+        checkJob.setOutputDir(finalOuts);
+        checkJob.setOutputKeyClass(IntWritable.class);
+        checkJob.setOutputValueClass(IntWritable.class);
+        checkJob.setOutputFormat(SequenceFileOutputFormat.class);
+        checkJob.setReducerClass(RandomCheckReducer.class);
+        checkJob.setNumReduceTasks(1);
+
+        JobClient.runJob(checkJob);
+
+        //
+        // Finally, we compare the reconstructed answer key with the
+        // original one.  Remember, we need to ignore zero-count items
+        // in the original key.
+        //
+        boolean success = true;
+        File recomputedkey = new File(finalOuts, "part-00000");
+        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), nutchConf);
+        int totalseen = 0;
+        try {
+            IntWritable key = new IntWritable();
+            IntWritable val = new IntWritable();            
+            for (int i = 0; i < range; i++) {
+                if (dist[i] == 0) {
+                    continue;
+                }
+                if (! in.next(key, val)) {
+                    System.err.println("Cannot read entry " + i);
+                    success = false;
+                    break;
+                } else {
+                    if ( !((key.get() == i ) && (val.get() == dist[i]))) {
+                        System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
+                        success = false;
+                    }
+                    totalseen += val.get();
+                }
+            }
+            if (success) {
+                if (in.next(key, val)) {
+                    System.err.println("Unnecessary lines in recomputed key!");
+                    success = false;
+                }
+            }
+        } finally {
+            in.close();
+        }
+        int originalTotal = 0;
+        for (int i = 0; i < dist.length; i++) {
+            originalTotal += dist[i];
+        }
+        System.out.println("Original sum: " + originalTotal);
+        System.out.println("Recomputed sum: " + totalseen);
+
+        //
+        // Write to "results" whether the test succeeded or not.
+        //
+        File resultFile = new File(testdir, "results");
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
+        try {
+            bw.write("Success=" + success + "\n");
+            System.out.println("Success=" + success);            
+        } finally {
+            bw.close();
+        }
+    }
+
+    /**
+     * Launches all the tasks in order.
+     */
+    public static void main(String[] argv) throws Exception {
+        if (argv.length < 2) {
+            System.err.println("Usage: MapredLoadTest <range> <counts>");
+            System.err.println();
+            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
+            return;
+        }
+
+        int i = 0;
+        int range = Integer.parseInt(argv[i++]);
+        int counts = Integer.parseInt(argv[i++]);
+
+        MapredLoadTest mlt = new MapredLoadTest(range, counts, new NutchConf());
+        mlt.launch();
+    }
+}

Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp (original)
+++ lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp Thu Feb  2 09:58:38 2006
@@ -9,21 +9,28 @@
 <%
   String jobid = request.getParameter("jobid");
   JobTracker tracker = JobTracker.getTracker();
-  JobTracker.JobInProgress job = (JobTracker.JobInProgress) tracker.getJob(jobid);
+  JobInProgress job = (JobInProgress) tracker.getJob(jobid);
   JobProfile profile = (job != null) ? (job.getProfile()) : null;
   JobStatus status = (job != null) ? (job.getStatus()) : null;
 
-  Vector mapTaskReports[] = tracker.getMapTaskReport(jobid);
-  Vector reduceTaskReports[] = tracker.getReduceTaskReport(jobid);
+  Vector mapTaskReports[] = (job != null) ? tracker.getMapTaskReport(jobid) : null;
+  Vector reduceTaskReports[] = (job != null) ? tracker.getReduceTaskReport(jobid) : null;
 %>
 
 <html>
 <title>Nutch MapReduce Job Details</title>
 <body>
+<%
+  if (job == null) {
+    %>
+    No job found<br>
+    <%
+  } else {
+    %>
 <h1>Job '<%=jobid%>'</h1>
 
 <b>Job File:</b> <%=profile.getJobFile()%><br>
-<b>Start time:</b> <%= new Date(job.getStartTime())%><br>
+<b>The job started at:</b> <%= new Date(job.getStartTime())%><br>
 <%
   if (status.getRunState() == JobStatus.RUNNING) {
     out.print("The job is still running.<br>\n");
@@ -38,19 +45,28 @@
 <h2>Map Tasks</h2>
   <center>
   <table border=2 cellpadding="5" cellspacing="2">
-  <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr>
+  <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr>
 
   <%
     for (int i = 0; i < mapTaskReports.length; i++) {
       Vector v = mapTaskReports[i];
-      out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>");
-      if (v.size() == 3) {
-        out.print("<td></td>");
-      } else {
-        for (int j = 3; j < v.size(); j++) {
-          out.print("<td>" + v.elementAt(j) + "</td>");
+      String tipid = (String) v.elementAt(0);
+      String progress = (String) v.elementAt(1);
+      int diagnosticSize = ((Integer) v.elementAt(2)).intValue();
+
+      out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>");
+      for (int j = 0; j < diagnosticSize; j++) {
+        Vector taskData = (Vector) v.elementAt(3 + ((2 * j)));
+        String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1));
+        out.print(taskStateString);
+        out.print("<b>");
+
+        for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) {
+          out.print("" + it2.next());
+          out.println("<b>");
         }
       }
+      out.print("</td>");
       out.print("</tr>\n");
     }
   %>
@@ -62,25 +78,36 @@
 <h2>Reduce Tasks</h2>
   <center>
   <table border=2 cellpadding="5" cellspacing="2">
-  <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr>
+  <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr>
 
   <%
     for (int i = 0; i < reduceTaskReports.length; i++) {
       Vector v = reduceTaskReports[i];
-      out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>");
-      if (v.size() == 3) {
-        out.print("<td></td>");
-      } else {
-        for (int j = 3; j < v.size(); j++) {
-          out.print("<td>" + v.elementAt(j) + "</td>");
+      String tipid = (String) v.elementAt(0);
+      String progress = (String) v.elementAt(1);
+      int diagnosticSize = ((Integer) v.elementAt(2)).intValue();
+
+      out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>");
+      for (int j = 0; j < diagnosticSize; j++) {
+        Vector taskData = (Vector) v.elementAt(3 + ((2 * j)));
+        String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1));
+        out.print(taskStateString);
+        out.print("<b>");
+
+        for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) {
+          out.print("" + it2.next());
+          out.println("<b>");
         }
       }
+      out.print("</td>");
       out.print("</tr>\n");
     }
   %>
   </table>
   </center>
-
+  <%
+  }
+%>
 
 <hr>
 <a href="/jobtracker.jsp">Go back to JobTracker</a><br>

Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp (original)
+++ lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp Thu Feb  2 09:58:38 2006
@@ -46,22 +46,22 @@
       out.print("<tr><td align=\"center\" colspan=\"8\"><b>" + label + " Jobs </b></td></tr>\n");
 
       if (jobs.size() > 0) {
-        out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps attempted</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces attempted</b></td><td><b>reduces completed</b></td></tr>\n");
+        out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces completed</b></td></tr>\n");
         for (Iterator it = jobs.iterator(); it.hasNext(); ) {
-          JobTracker.JobInProgress job = (JobTracker.JobInProgress) it.next();
+          JobInProgress job = (JobInProgress) it.next();
           JobProfile profile = job.getProfile();
           JobStatus status = job.getStatus();
 
           String jobid = profile.getJobId();
-          float completedRatio = (100 * job.completedRatio());
+          double completedRatio = (0.5 * (100 * status.mapProgress())) +
+                                 (0.5 * (100 * status.reduceProgress()));
+
           int desiredMaps = job.desiredMaps();
-          int attemptedMaps = job.attemptedMaps();
-          int completedMaps = job.completedMaps();
           int desiredReduces = job.desiredReduces();
-          int attemptedReduces = job.attemptedReduces();
-          int completedReduces = job.completedReduces();
+          int completedMaps = job.finishedMaps();
+          int completedReduces = job.finishedReduces();
 
-          out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + attemptedMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td>" + attemptedReduces + "</td><td> " + completedReduces + "</td></tr>\n");
+          out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td> " + completedReduces + "</td></tr>\n");
         }
       } else {
         out.print("<tr><td align=\"center\" colspan=\"8\"><i>none</i></td></tr>\n");