You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/12/07 08:55:40 UTC

svn commit: r1042928 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Dec  7 07:55:39 2010
New Revision: 1042928

URL: http://svn.apache.org/viewvc?rev=1042928&view=rev
Log:
The all taskid variable's type should be declared as a TaskAttemptID

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Dec  7 07:55:39 2010
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID
+                       (edwardyoon)
     HAMA-331: Removing JobInProgressListener 
                        and adding JobInitThread to BSPMaster (edwardyoon)
     HAMA-334: Removing "java5.home" env key from build script. (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Tue Dec  7 07:55:39 2010
@@ -67,7 +67,7 @@ public class BSPMaster implements JobSub
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
   public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long JOBINIT_SLEEP_INTERVAL = 2000;
-  
+
   // States
   State state = State.INITIALIZING;
 
@@ -102,9 +102,9 @@ public class BSPMaster implements JobSub
   private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
   private TaskScheduler taskScheduler;
 
-  TreeMap<String, String> taskIdToGroomNameMap = new TreeMap<String, String>();
-  TreeMap<String, TreeSet<String>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<String>>();
-  Map<String, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<String, TaskInProgress>();
+  TreeMap<TaskAttemptID, String> taskIdToGroomNameMap = new TreeMap<TaskAttemptID, String>();
+  TreeMap<String, TreeSet<TaskAttemptID>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
+  Map<TaskAttemptID, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<TaskAttemptID, TaskInProgress>();
 
   Vector<JobInProgress> jobInitQueue = new Vector<JobInProgress>();
   JobInitThread initJobs = new JobInitThread();
@@ -195,43 +195,45 @@ public class BSPMaster implements JobSub
     return activeGrooms;
   }
 
-  /////////////////////////////////////////////////////////////////
-  //  Used to init new jobs that have just been created
-  /////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////
+  // Used to init new jobs that have just been created
+  // ///////////////////////////////////////////////////////////////
   class JobInitThread implements Runnable {
     private volatile boolean shouldRun = true;
-    
-      public JobInitThread() {
-      }
-      public void run() {
-          while (shouldRun) {
-              JobInProgress job = null;
-              synchronized (jobInitQueue) {
-                  if (jobInitQueue.size() > 0) {
-                      job = (JobInProgress) jobInitQueue.elementAt(0);
-                      jobInitQueue.remove(job);
-                  } else {
-                      try {
-                          jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
-                      } catch (InterruptedException iex) {
-                      }
-                  }
-              }
-              try {
-                  if (job != null) {
-                      job.initTasks();
-                  }
-              } catch (Exception e) {
-                  LOG.warn("job init failed: " + e);
-                  job.kill();
-              }
+
+    public JobInitThread() {
+    }
+
+    public void run() {
+      while (shouldRun) {
+        JobInProgress job = null;
+        synchronized (jobInitQueue) {
+          if (jobInitQueue.size() > 0) {
+            job = (JobInProgress) jobInitQueue.elementAt(0);
+            jobInitQueue.remove(job);
+          } else {
+            try {
+              jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
+            } catch (InterruptedException iex) {
+            }
           }
+        }
+        try {
+          if (job != null) {
+            job.initTasks();
+          }
+        } catch (Exception e) {
+          LOG.warn("job init failed: " + e);
+          job.kill();
+        }
       }
-      public void stopIniter() {
-          shouldRun = false;
-      }
+    }
+
+    public void stopIniter() {
+      shouldRun = false;
+    }
   }
-  
+
   // /////////////////////////////////////////////////////////////
   // BSPMaster methods
   // /////////////////////////////////////////////////////////////
@@ -303,7 +305,7 @@ public class BSPMaster implements JobSub
   public void offerService() throws InterruptedException, IOException {
     new Thread(this.initJobs).start();
     LOG.info("Starting jobInitThread");
-    
+
     this.interTrackerServer.start();
 
     synchronized (this) {
@@ -355,11 +357,12 @@ public class BSPMaster implements JobSub
         groomToHeartbeatResponseMap.remove(groomName);
       }
       return new HeartbeatResponse(newResponseId,
-          new GroomServerAction[] { new ReinitGroomAction() },
-          Collections.<String, String>emptyMap());
+          new GroomServerAction[] { new ReinitGroomAction() }, Collections
+              .<String, String> emptyMap());
     }
 
-    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerPeers);
+    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null,
+        groomServerPeers);
     List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
 
     // Check for new tasks to be executed on the groom server
@@ -372,7 +375,7 @@ public class BSPMaster implements JobSub
 
         for (Task task : taskList) {
           if (task != null) {
-                actions.add(new LaunchTaskAction(task));
+            actions.add(new LaunchTaskAction(task));
           }
         }
       }
@@ -383,7 +386,7 @@ public class BSPMaster implements JobSub
     groomToHeartbeatResponseMap.put(groomName, response);
     removeMarkedTasks(groomName);
     updateTaskStatuses(status);
-    
+
     return response;
   }
 
@@ -391,8 +394,9 @@ public class BSPMaster implements JobSub
     for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
       TaskStatus report = it.next();
       report.setGroomServer(status.getGroomName());
-      String taskId = report.getTaskId();
-      TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap.get(taskId);
+      TaskAttemptID taskId = report.getTaskId();
+      TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
+          .get(taskId);
 
       if (tip == null) {
         LOG.info("Serious problem.  While updating status, cannot find taskid "
@@ -414,14 +418,14 @@ public class BSPMaster implements JobSub
   }
 
   // (trackerID -> TreeSet of completed taskids running at that tracker)
-  TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
+  TreeMap<String, TreeSet<TaskAttemptID>> trackerToMarkedTasksMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
 
   private void removeMarkedTasks(String groomName) {
     // Purge all the 'marked' tasks which were running at taskTracker
-    TreeSet<String> markedTaskSet = (TreeSet<String>) trackerToMarkedTasksMap
+    TreeSet<TaskAttemptID> markedTaskSet = trackerToMarkedTasksMap
         .get(groomName);
     if (markedTaskSet != null) {
-      for (String taskid : markedTaskSet) {
+      for (TaskAttemptID taskid : markedTaskSet) {
         removeTaskEntry(taskid);
         LOG.info("Removed completed task '" + taskid + "' from '" + groomName
             + "'");
@@ -431,13 +435,13 @@ public class BSPMaster implements JobSub
     }
   }
 
-  private void removeTaskEntry(String taskid) {
+  private void removeTaskEntry(TaskAttemptID taskid) {
     // taskid --> groom
     String groom = taskIdToGroomNameMap.remove(taskid);
 
     // groom --> taskid
     if (groom != null) {
-      TreeSet<String> groomSet = groomNameToTaskIdsMap.get(groom);
+      TreeSet<TaskAttemptID> groomSet = groomNameToTaskIdsMap.get(groom);
       if (groomSet != null) {
         groomSet.remove(taskid);
       }
@@ -449,12 +453,13 @@ public class BSPMaster implements JobSub
   }
 
   private List<GroomServerAction> getTasksToKill(String groomName) {
-    Set<String> taskIds = (TreeSet<String>) groomNameToTaskIdsMap.get(groomName);
+    Set<TaskAttemptID> taskIds = groomNameToTaskIdsMap.get(groomName);
     if (taskIds != null) {
       List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
       Set<String> killJobIds = new TreeSet<String>();
-      for (String killTaskId : taskIds) {
-        TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap.get(killTaskId);
+      for (TaskAttemptID killTaskId : taskIds) {
+        TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
+            .get(killTaskId);
         if (tip.shouldCloseForClosedJob(killTaskId)) {
           // 
           // This is how the BSPMaster ends a task at the GroomServer.
@@ -482,7 +487,7 @@ public class BSPMaster implements JobSub
     return null;
 
   }
-  
+
   /**
    * Process incoming heartbeat messages from the groom.
    */
@@ -499,7 +504,8 @@ public class BSPMaster implements JobSub
     }
 
     if (initialContact) {
-      groomServerPeers.put(groomStatus.getGroomName(), groomStatus.getPeerName());
+      groomServerPeers.put(groomStatus.getGroomName(), groomStatus
+          .getPeerName());
     }
 
     return true;
@@ -571,7 +577,7 @@ public class BSPMaster implements JobSub
         jobInitQueue.notifyAll();
       }
     }
-    
+
     return job.getStatus();
   }
 
@@ -686,7 +692,7 @@ public class BSPMaster implements JobSub
     this.interTrackerServer.stop();
   }
 
-  public void createTaskEntry(String taskid, String groomServer,
+  public void createTaskEntry(TaskAttemptID taskid, String groomServer,
       TaskInProgress taskInProgress) {
     LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
         + ", for groom '" + groomServer + "'");
@@ -695,9 +701,9 @@ public class BSPMaster implements JobSub
     taskIdToGroomNameMap.put(taskid, groomServer);
 
     // groom --> taskid
-    TreeSet<String> taskset = groomNameToTaskIdsMap.get(groomServer);
+    TreeSet<TaskAttemptID> taskset = groomNameToTaskIdsMap.get(groomServer);
     if (taskset == null) {
-      taskset = new TreeSet<String>();
+      taskset = new TreeSet<TaskAttemptID>();
       groomNameToTaskIdsMap.put(groomServer, taskset);
     }
     taskset.add(taskid);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Tue Dec  7 07:55:39 2010
@@ -22,7 +22,7 @@ public class BSPTask extends Task {
   public BSPTask() {
   }
 
-  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
+  public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Dec  7 07:55:39 2010
@@ -87,7 +87,7 @@ public class GroomServer implements Runn
   private int maxCurrentTasks = 1;
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /** Map from taskId -> TaskInProgress. */
-  Map<String, TaskInProgress> runningTasks = null;
+  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
@@ -122,7 +122,7 @@ public class GroomServer implements Runn
     // Clear out state tables
     this.tasks.clear();
     this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
-    this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
+    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.acceptNewTasks = true;
 
     this.conf.set(Constants.PEER_HOST, localHostname);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Tue Dec  7 07:55:39 2010
@@ -53,7 +53,9 @@ class JobInProgress {
   Path localJobFile = null;
   Path localJarFile = null;
   private LocalFileSystem localFs;
-
+  // Indicates how many times the job got restarted
+  private int restartCount;
+  
   long startTime;
   long launchTime;
   long finishTime;
@@ -66,7 +68,7 @@ class JobInProgress {
 
   int numBSPTasks = 0;
   int clusterSize;
-
+  
   public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
       throws IOException {
     this.conf = conf;
@@ -77,8 +79,10 @@ class JobInProgress {
     this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
+    
     this.superstepCounter = 0;
-
+    this.restartCount = 0;
+    
     this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
         + ".xml");
     this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
@@ -191,7 +195,7 @@ class JobInProgress {
   }
 
   public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
-    String taskid = status.getTaskId();
+    TaskAttemptID taskid = status.getTaskId();
     updateTaskStatus(tip, status);
     LOG.info("Taskid '" + taskid + "' has finished successfully.");
     tip.completed(taskid);
@@ -272,4 +276,12 @@ class JobInProgress {
       LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
     }
   }
+
+  /**
+   * Get the number of times the job has restarted
+   */
+  int getNumRestarts() {
+    return restartCount;
+  }
+  
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Tue Dec  7 07:55:39 2010
@@ -21,38 +21,35 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Text;
-
-
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
  * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
  * 
  */
 class KillTaskAction extends GroomServerAction {
-  String taskId;
+  TaskAttemptID taskId;
   
   public KillTaskAction() {
     super(ActionType.KILL_TASK);
-    taskId = new String();
+    taskId = new TaskAttemptID();
   }
   
-  public KillTaskAction(String killTaskId) {
+  public KillTaskAction(TaskAttemptID killTaskId) {
     super(ActionType.KILL_TASK);
     this.taskId = killTaskId;
   }
 
-  public String getTaskID() {
+  public TaskAttemptID getTaskID() {
     return taskId;
   }
   
   @Override
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, taskId);
+    taskId.write(out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    taskId = Text.readString(in);
+    taskId.readFields(in);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Tue Dec  7 07:55:39 2010
@@ -35,17 +35,17 @@ public abstract class Task implements Wr
   
   protected BSPJobID jobId;
   protected String jobFile;
-  protected String taskId;
+  protected TaskAttemptID taskId;
   protected int partition;
   
   protected LocalDirAllocator lDirAlloc;
 
   public Task() {
     jobId = new BSPJobID();
-    taskId = new String();
+    taskId = new TaskAttemptID();
   }
   
-  public Task(BSPJobID jobId, String jobFile, String taskId, int partition) {
+  public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskId;
@@ -63,7 +63,7 @@ public abstract class Task implements Wr
     return jobFile; 
   }
   
-  public String getTaskID() {
+  public TaskAttemptID getTaskID() {
     return taskId;
   }
   
@@ -95,7 +95,7 @@ public abstract class Task implements Wr
   public void write(DataOutput out) throws IOException {
     jobId.write(out);
     Text.writeString(out, jobFile);
-    Text.writeString(out, taskId);
+    taskId.write(out);
     out.writeInt(partition);
   }
   
@@ -103,7 +103,7 @@ public abstract class Task implements Wr
   public void readFields(DataInput in) throws IOException {
     jobId.readFields(in);
     jobFile = Text.readString(in);
-    taskId = Text.readString(in);
+    taskId.readFields(in);
     partition = in.readInt();
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Tue Dec  7 07:55:39 2010
@@ -21,6 +21,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+/**
+ * TaskAttemptID is a unique identifier for a task attempt.
+ */
 public class TaskAttemptID extends ID {
   protected static final String ATTEMPT = "attempt";
   private TaskID taskId;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue Dec  7 07:55:39 2010
@@ -38,7 +38,8 @@ class TaskInProgress {
   static final int MAX_TASK_EXECS = 1;
   int maxTaskAttempts = 4;
   private boolean failed = false;
-
+  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+  
   // Job Meta
   private String jobFile = null;
   private int partition;
@@ -63,13 +64,13 @@ class TaskInProgress {
 
   // Map from task Id -> GroomServer Id, contains tasks that are
   // currently runnings
-  private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
+  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
   // All attempt Ids of this TIP
   // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
   /**
    * Map from taskId -> TaskStatus
    */
-  private TreeMap<String, TaskStatus> taskStatuses = new TreeMap<String, TaskStatus>();
+  private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
 
   private BSPJobID jobId;
 
@@ -92,9 +93,12 @@ class TaskInProgress {
     Task t = null;
 
     // TODO use the TaskID, instead of String.
-    String taskid = null;
+    TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
-      taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
+      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
+      taskid = new TaskAttemptID( id, attemptId);
+      //new String("task_" + status.getGroomName() + "_" + nextTaskId);
+      
       ++nextTaskId;
     } else {
       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
@@ -131,7 +135,7 @@ class TaskInProgress {
     return id;
   }
 
-  public TreeMap<String, String> getTasks() {
+  public TreeMap<TaskAttemptID, String> getTasks() {
     return activeTasks;
   }
 
@@ -163,9 +167,9 @@ class TaskInProgress {
     return (completes > 0);
   }
 
-  private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
+  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
 
-  public boolean shouldCloseForClosedJob(String taskid) {
+  public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
     TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
     if ((ts != null) && (!tasksReportedClosed.contains(taskid))
         && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
@@ -176,8 +180,8 @@ class TaskInProgress {
     }
   }
 
-  public void completed(String taskid) {
-    LOG.info("Task '" + taskid + "' has completed.");
+  public void completed(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
     TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
     status.setRunState(TaskStatus.State.SUCCEEDED);
     activeTasks.remove(taskid);
@@ -195,7 +199,7 @@ class TaskInProgress {
     taskStatuses.put(status.getTaskId(), status);
   }
 
-  public TaskStatus getTaskStatus(String taskId) {
+  public TaskStatus getTaskStatus(TaskAttemptID taskId) {
     return this.taskStatuses.get(taskId);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Tue Dec  7 07:55:39 2010
@@ -40,7 +40,7 @@ class TaskStatus implements Writable, Cl
     RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
   }
 
-  private String taskId;
+  private TaskAttemptID taskId;
   private float progress;
   private volatile State runState;
   private String stateString;
@@ -56,11 +56,11 @@ class TaskStatus implements Writable, Cl
    * 
    */
   public TaskStatus() {
-    taskId = new String();
+    taskId = new TaskAttemptID();
     this.superstepCount = 0;
   }
 
-  public TaskStatus(String taskId, float progress, State runState,
+  public TaskStatus(TaskAttemptID taskId, float progress, State runState,
       String stateString, String groomServer, Phase phase) {
     this.taskId = taskId;
     this.progress = progress;
@@ -75,7 +75,7 @@ class TaskStatus implements Writable, Cl
   // Accessors and Modifiers
   // //////////////////////////////////////////////////
 
-  public String getTaskId() {
+  public TaskAttemptID getTaskId() {
     return taskId;
   }
 
@@ -242,7 +242,7 @@ class TaskStatus implements Writable, Cl
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.taskId = Text.readString(in);
+    this.taskId.readFields(in);
     this.progress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.stateString = Text.readString(in);
@@ -254,7 +254,7 @@ class TaskStatus implements Writable, Cl
 
   @Override
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, taskId);
+    taskId.write(out);
     out.writeFloat(progress);
     WritableUtils.writeEnum(out, runState);
     Text.writeString(out, stateString);

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Dec  7 07:55:39 2010
@@ -96,8 +96,7 @@ public class TestBSPPeer extends HamaClu
         peerNames.add("localhost:" + (30000 + i));
       }
       peer.setAllPeerNames(peerNames);
-      TaskStatus currentTaskStatus = new TaskStatus("localhost:"
-          + lastTwoDigitsOfPort, 0, null, null, null, null);
+      TaskStatus currentTaskStatus = new TaskStatus(new TaskAttemptID(), 0, null, null, null, null);
       peer.setCurrentTaskStatus(currentTaskStatus);
       BSPJob jobConf = new BSPJob(conf, NUM_PEER);
       peer.setJobConf((BSPJob) jobConf);