You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/29 22:04:18 UTC

svn commit: r501182 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Mon Jan 29 13:04:17 2007
New Revision: 501182

URL: http://svn.apache.org/viewvc?view=rev&rev=501182
Log:
HADOOP-949.  svn merge -r 500410:500397, reverting a patch from HADOOP-248, which was causing MapReduce programs to hang.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jan 29 13:04:17 2007
@@ -87,8 +87,7 @@
     with different versions of Lucene without worrying about CLASSPATH
     order.  (Milind Bhandarkar via cutting)
 
-27. HADOOP-248.  Optimize location of map outputs to no longer use
-    random probes.  (Devaraj Das via cutting)
+27. [ intentionally blank ]
 
 28. HADOOP-227.  Add support for backup namenodes, which periodically
     get snapshots of the namenode state.  (Dhruba Borthakur via cutting) 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Jan 29 13:04:17 2007
@@ -28,10 +28,11 @@
  */ 
 interface InterTrackerProtocol extends VersionedProtocol {
   /**
-   * version 4 introduced that removes locateMapOutputs and instead uses
-   * getTaskCompletionEvents to figure finished maps and fetch the outputs
+   * version 3 introduced to replace 
+   * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
+   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 3L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -61,6 +62,18 @@
           boolean initialContact, boolean acceptNewTasks, short responseId)
   throws IOException;
 
+  /** Called by a reduce task to find which map tasks are completed.
+   *
+   * @param jobId the job id
+   * @param mapTasksNeeded an array of the mapIds that we need
+   * @param partition the reduce's id
+   * @return an array of MapOutputLocation
+   */
+  MapOutputLocation[] locateMapOutputs(String jobId, 
+                                       int[] mapTasksNeeded,
+                                       int partition
+                                       ) throws IOException;
+
   /**
    * The task tracker calls this once, to discern where it can find
    * files referred to by the JobTracker
@@ -83,12 +96,11 @@
    * Returns empty aray if no events are available. 
    * @param jobid job id 
    * @param fromEventId event id to start from. 
-   * @param maxEvents the max number of events we want to look at
    * @return array of task completion events. 
    * @throws IOException
    */
   TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId, int maxEvents) throws IOException;
+      String jobid, int fromEventId) throws IOException; 
   
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Jan 29 13:04:17 2007
@@ -157,7 +157,7 @@
         public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
             int startFrom) throws IOException{
           return jobSubmitClient.getTaskCompletionEvents(
-              getJobID(), startFrom, 10); 
+              getJobID(), startFrom); 
         }
 
         /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jan 29 13:04:17 2007
@@ -291,40 +291,24 @@
           
           if (state == TaskStatus.State.SUCCEEDED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker, 
-                status.getTaskId(),
-                tip.idWithinJob(),
-                status.getIsMap(),
+                taskCompletionEventTracker++, 
+                status.getTaskId(), 
                 TaskCompletionEvent.Status.SUCCEEDED,
                 httpTaskLogLocation ));
-            tip.setSuccessEventNumber(taskCompletionEventTracker);
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker, 
-                status.getTaskId(),
-                tip.idWithinJob(),
-                status.getIsMap(),
+                taskCompletionEventTracker++, 
+                status.getTaskId(), 
                 TaskCompletionEvent.Status.FAILED, 
                 httpTaskLogLocation ));
-            // Get the event number for the (possibly) previously successful
-            // task. If there exists one, then set that status to OBSOLETE 
-            int eventNumber;
-            if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
-              TaskCompletionEvent t = 
-                this.taskCompletionEvents.get(eventNumber);
-              if (t.getTaskId().equals(status.getTaskId()))
-                t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
-            }
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
           }          
         }
 
-        taskCompletionEventTracker++;
-        
         //
         // Update JobInProgress status
         //
@@ -794,14 +778,12 @@
        return null;
     }
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, 
-        int maxEvents) {
+    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       if( taskCompletionEvents.size() > fromEventId) {
-        int actualMax = Math.min(maxEvents, 
-            (taskCompletionEvents.size() - fromEventId));
         events = (TaskCompletionEvent[])taskCompletionEvents.subList(
-            fromEventId, actualMax + fromEventId).toArray(events);        
+            fromEventId, taskCompletionEvents.size()).
+            toArray(events);        
       }
       return events; 
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Jan 29 13:04:17 2007
@@ -85,12 +85,11 @@
      * Get task completion events for the jobid, starting from fromEventId. 
      * Returns empty aray if no events are available. 
      * @param jobid job id 
-     * @param fromEventId event id to start from.
-     * @param maxEvents the max number of events we want to look at 
+     * @param fromEventId event id to start from. 
      * @return array of task completion events. 
      * @throws IOException
      */
     public TaskCompletionEvent[] getTaskCompletionEvents(
-              String jobid, int fromEventId, int maxEvents) throws IOException;
+                String jobid, int fromEventId) throws IOException;
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jan 29 13:04:17 2007
@@ -1263,6 +1263,48 @@
     }
 
     /**
+     * A TaskTracker wants to know the physical locations of completed, but not
+     * yet closed, tasks.  This exists so the reduce task thread can locate
+     * map task outputs.
+     */
+    public synchronized MapOutputLocation[] 
+             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
+    throws IOException {
+        // Check to make sure that the job hasn't 'completed'.
+        JobInProgress job = getJob(jobId);
+        if (job.status.getRunState() != JobStatus.RUNNING) {
+          return new MapOutputLocation[0];
+        }
+        
+        ArrayList result = new ArrayList(mapTasksNeeded.length);
+        for (int i = 0; i < mapTasksNeeded.length; i++) {
+          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
+          if (status != null) {
+             String trackerId = 
+               (String) taskidToTrackerMap.get(status.getTaskId());
+             // Safety check, if we can't find the taskid in 
+             // taskidToTrackerMap and job isn't 'running', then just
+             // return an empty array
+             if (trackerId == null && 
+                     job.status.getRunState() != JobStatus.RUNNING) {
+               return new MapOutputLocation[0];
+             }
+             
+             TaskTrackerStatus tracker;
+             synchronized (taskTrackers) {
+               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+             }
+             result.add(new MapOutputLocation(status.getTaskId(), 
+                                              mapTasksNeeded[i],
+                                              tracker.getHost(), 
+                                              tracker.getHttpPort()));
+          }
+        }
+        return (MapOutputLocation[]) 
+               result.toArray(new MapOutputLocation[result.size()]);
+    }
+
+    /**
      * Grab the local fs name
      */
     public synchronized String getFilesystemName() throws IOException {
@@ -1393,14 +1435,14 @@
     /* 
      * Returns a list of TaskCompletionEvent for the given job, 
      * starting from fromEventId.
-     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
+     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-        String jobid, int fromEventId, int maxEvents) throws IOException{
+        String jobid, int fromEventId) throws IOException{
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       JobInProgress job = (JobInProgress)this.jobs.get(jobid);
       if (null != job) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+        events = job.getTaskCompletionEvents(fromEventId);
       }
       return events;
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jan 29 13:04:17 2007
@@ -244,7 +244,7 @@
 
   public JobStatus[] jobsToComplete() {return null;}
   public TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId, int maxEvents) throws IOException{
+      String jobid, int fromEventId) throws IOException{
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Jan 29 13:04:17 2007
@@ -122,19 +122,9 @@
   
   /**
    * the number of map output locations to poll for at one time
-   */  
-  private int probe_sample_size = 50;
-  
-  /**
-   * a Random used during the map output fetching
-   */
-  private Random randForProbing;
-  
-  /**
-   * a hashmap from mapId to MapOutputLocation for retrials
    */
-  private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
-  
+  private static final int PROBE_SAMPLE_SIZE = 50;
+
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
     
@@ -378,10 +368,6 @@
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
     
-    //tweak the probe sample size (make it a function of numCopiers)
-    probe_sample_size = Math.max(numCopiers*5, 50);
-    randForProbing = new Random(reduceTask.getPartition() * 100);
-    
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
       copyPhase.addPhase();       // add sub-phase per file
@@ -399,8 +385,6 @@
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
     long currentTime = startTime;
-    int fromEventId = 0;
-    
     PingTimer pingTimer = new PingTimer();
     pingTimer.setName("Map output copy reporter for task " + 
                       reduceTask.getTaskId());
@@ -417,36 +401,17 @@
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
                  " map output location(s)");
         try {
-          // the call to queryJobTracker will modify fromEventId to a value
-          // that it should be for the next call to queryJobTracker
-          MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
+          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
           
           // remove discovered outputs from needed list
           // and put them on the known list
-          int gotLocs = (locs == null ? 0 : locs.length);
           for (int i=0; i < locs.length; i++) {
-            // check whether we actually need an output. It could happen
-            // that a map task that successfully ran earlier got lost, but
-            // if we already have copied the output of that unfortunate task
-            // we need not copy it again from the new TT (we will ignore 
-            // the event for the new rescheduled execution)
-            if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
-              // remove the mapId from the retryFetches hashmap since we now
-              // prefer the new location instead of what we saved earlier
-              retryFetches.remove(new Integer(locs[i].getMapId()));
-              knownOutputs.add(locs[i]);
-              gotLocs--;
-            }
-            
+            neededOutputs.remove(new Integer(locs[i].getMapId()));
+            knownOutputs.add(locs[i]);
           }
-          // now put the remaining hash entries for the failed fetches 
-          // and clear the hashmap
-          knownOutputs.addAll(retryFetches.values());
           LOG.info(reduceTask.getTaskId() +
-                " Got " + gotLocs + 
-                " new map outputs from jobtracker and " + retryFetches.size() +
-                " map outputs from previous failures");
-          retryFetches.clear();
+                   " Got " + (locs == null ? 0 : locs.length) + 
+                   " map outputs from jobtracker");
         }
         catch (IOException ie) {
           LOG.warn(reduceTask.getTaskId() +
@@ -520,7 +485,6 @@
           } else {
             // this copy failed, put it back onto neededOutputs
             neededOutputs.add(new Integer(cr.getMapId()));
-            retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
             // wait a random amount of time for next contact
             currentTime = System.currentTimeMillis();
@@ -540,7 +504,6 @@
             while (locIt.hasNext()) {
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               if (cr.getHost().equals(loc.getHost())) {
-                retryFetches.put(new Integer(loc.getMapId()), loc);
                 locIt.remove();
                 neededOutputs.add(new Integer(loc.getMapId()));
               }
@@ -551,7 +514,7 @@
         }
         
         // ensure we have enough to keep us busy
-        if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
+        if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
           break;
         }
       }
@@ -645,16 +608,28 @@
   }
   
   /** Queries the job tracker for a set of outputs ready to be copied
-   * @param fromEventId the first event ID we want to start from, this will be
-   * modified by the call to this method
+   * @param neededOutputs the list of currently unknown outputs
    * @param jobClient the job tracker
    * @return a set of locations to copy outputs from
    * @throws IOException
    */  
-  private MapOutputLocation[] queryJobTracker(int fromEventId, 
+  private MapOutputLocation[] queryJobTracker(List neededOutputs,
                                               InterTrackerProtocol jobClient)
   throws IOException {
     
+    // query for a just a random subset of needed segments so that we don't
+    // overwhelm jobtracker.  ideally perhaps we could send a more compact
+    // representation of all needed, i.e., a bit-vector
+    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
+    int     neededIds[] = new int[checkSize];
+      
+    Collections.shuffle(neededOutputs);
+      
+    ListIterator itr = neededOutputs.listIterator();
+    for (int i=0; i < checkSize; i++) {
+      neededIds[i] = ((Integer)itr.next()).intValue();
+    }
+
     long currentTime = System.currentTimeMillis();    
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
     while (currentTime < pollTime) {
@@ -665,28 +640,9 @@
     }
     lastPollTime = currentTime;
 
-    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
-                                      reduceTask.getJobId().toString(),
-                                      fromEventId,
-                                      probe_sample_size);
-    
-    List <MapOutputLocation> mapOutputsList = new ArrayList();
-    for (int i = 0; i < t.length; i++) {
-      if (t[i].isMap && 
-          t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
-        URI u = URI.create(t[i].getTaskTrackerHttp());
-        String host = u.getHost();
-        int port = u.getPort();
-        String taskId = t[i].getTaskId();
-        int mId = t[i].idWithinJob();
-        mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
-      }
-    }
-    Collections.shuffle(mapOutputsList, randForProbing);
-    MapOutputLocation[] locations =
-                        new MapOutputLocation[mapOutputsList.size()];
-    fromEventId += t.length;
-    return mapOutputsList.toArray(locations);
+    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), 
+                                      neededIds,
+                                      reduceTask.getPartition());
   }
 
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Mon Jan 29 13:04:17 2007
@@ -12,14 +12,12 @@
  *
  */
 public class TaskCompletionEvent implements Writable{
-    static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
+    static public enum Status {FAILED, SUCCEEDED};
     
     private int eventId ; 
     private String taskTrackerHttp ;
     private String taskId ;
     Status status ; 
-    boolean isMap = false ;
-    private int idWithinJob;
     public static final TaskCompletionEvent[] EMPTY_ARRAY = 
         new TaskCompletionEvent[0];
     /**
@@ -37,15 +35,11 @@
      * @param taskTrackerHttp task tracker's host:port for http. 
      */
     public TaskCompletionEvent(int eventId, 
-        String taskId,
-        int idWithinJob,
-        boolean isMap,
+        String taskId, 
         Status status, 
         String taskTrackerHttp){
       
-        this.taskId = taskId ;
-        this.idWithinJob = idWithinJob ;
-        this.isMap = isMap ;
+        this.taskId = taskId ; 
         this.eventId = eventId ; 
         this.status =status ; 
         this.taskTrackerHttp = taskTrackerHttp ;
@@ -120,28 +114,17 @@
         return buf.toString();
     }
     
-    public boolean isMapTask() {
-        return isMap;
-    }
-    
-    public int idWithinJob() {
-      return idWithinJob;
-    }
     //////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
         WritableUtils.writeString(out, taskId); 
-        WritableUtils.writeVInt(out, idWithinJob);
-        out.writeBoolean(isMap);
         WritableUtils.writeEnum(out, status); 
         WritableUtils.writeString(out, taskTrackerHttp);
     }
   
     public void readFields(DataInput in) throws IOException {
         this.taskId = WritableUtils.readString(in) ; 
-        this.idWithinJob = WritableUtils.readVInt(in);
-        this.isMap = in.readBoolean();
         this.status = WritableUtils.readEnum(in, Status.class);
         this.taskTrackerHttp = WritableUtils.readString(in);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jan 29 13:04:17 2007
@@ -60,7 +60,6 @@
     private JobInProgress job;
 
     // Status of the TIP
-    private int successEventNumber = -1;
     private int numTaskFailures = 0;
     private double progress = 0;
     private String state = "";
@@ -140,15 +139,6 @@
     }
     
     /**
-     * Return the index of the tip within the job, so "tip_0002_m_012345"
-     * would return 12345;
-     * @return int the tip index
-     */
-     public int idWithinJob() {
-       return partition;
-     }    
-
-    /**
      * Initialization common to Map and Reduce
      */
     void init(String jobUniqueString) {
@@ -577,19 +567,5 @@
      */
     public int getIdWithinJob() {
       return partition;
-    }
-    
-    /**
-     * Set the event number that was raised for this tip
-     */
-    public void setSuccessEventNumber(int eventNumber) {
-      successEventNumber = eventNumber;
-    }
-       
-    /**
-     * Get the event number that was raised for this tip
-     */
-    public int getSuccessEventNumber() {
-      return successEventNumber;
     }
 }