You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/04 11:11:05 UTC

svn commit: r1004154 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/

Author: edwardyoon
Date: Mon Oct  4 09:11:04 2010
New Revision: 1004154

URL: http://svn.apache.org/viewvc?rev=1004154&view=rev
Log:
Implementations of Task progress

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct  4 09:11:04 2010
@@ -46,6 +46,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-286: Task progress should be monitored (eddieyoon)
     HAMA-287: BSPMaster should use the bsp.master.port config property
                        when creating its InetSocketAddr instance (Filipe Manana via edwardyoon)
     HAMA-283: Removing duplicate code (Filipe Manana via edwardyoon)
@@ -153,6 +154,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-291: bsp.groom.port is unnused and superseeded 
+                       by bsp.peer.port (Filipe Manana via edwardyoon)
     HAMA-288: Typo might lead to null tasks getting assigned 
                        to groom servers (Filipe Manana via edwardyoon)
     HAMA-280: Fix warnings (Filipe Manana via edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct  4 09:11:04 2010
@@ -38,32 +38,37 @@ import org.apache.hama.ipc.JobSubmission
 
 public class BSPJobClient extends Configured {
   private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
-  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
+
+  public static enum TaskStatusFilter {
+    NONE, KILLED, FAILED, SUCCEEDED, ALL
+  }
+
   private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
 
   class NetworkedJob implements RunningJob {
     JobProfile profile;
     JobStatus status;
     long statustime;
-    
+
     public NetworkedJob(JobStatus job) throws IOException {
       this.status = job;
       this.profile = jobSubmitClient.getJobProfile(job.getJobID());
       this.statustime = System.currentTimeMillis();
     }
-    
+
     /**
-     * Some methods rely on having a recent job profile object.  Refresh
-     * it, if necessary
+     * Some methods rely on having a recent job profile object. Refresh it, if
+     * necessary
      */
     synchronized void ensureFreshStatus() throws IOException {
       if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
         updateStatus();
       }
     }
-    
-    /** Some methods need to update status immediately. So, refresh
-     * immediately
+
+    /**
+     * Some methods need to update status immediately. So, refresh immediately
+     * 
      * @throws IOException
      */
     synchronized void updateStatus() throws IOException {
@@ -71,15 +76,17 @@ public class BSPJobClient extends Config
       this.statustime = System.currentTimeMillis();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
      * @see org.apache.hama.bsp.RunningJob#getID()
      */
     @Override
-    public BSPJobID getID() {      
+    public BSPJobID getID() {
       return profile.getJobID();
     }
-    
-    /* (non-Javadoc)
+
+    /*
+     * (non-Javadoc)
      * @see org.apache.hama.bsp.RunningJob#getJobName()
      */
     @Override
@@ -87,30 +94,30 @@ public class BSPJobClient extends Config
       return profile.getJobName();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
      * @see org.apache.hama.bsp.RunningJob#getJobFile()
      */
     @Override
     public String getJobFile() {
       return profile.getJobFile();
     }
-    
+
     @Override
     public float progress() throws IOException {
       ensureFreshStatus();
       return status.progress();
     }
-    
+
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
       updateStatus();
-      return (status.getRunState() == JobStatus.SUCCEEDED ||
-              status.getRunState() == JobStatus.FAILED ||
-              status.getRunState() == JobStatus.KILLED);
+      return (status.getRunState() == JobStatus.SUCCEEDED
+          || status.getRunState() == JobStatus.FAILED || status.getRunState() == JobStatus.KILLED);
     }
-    
+
     /**
      * True iff job completed successfully.
      */
@@ -118,7 +125,7 @@ public class BSPJobClient extends Config
       updateStatus();
       return status.getRunState() == JobStatus.SUCCEEDED;
     }
-    
+
     /**
      * Blocks until the job is finished
      */
@@ -138,7 +145,7 @@ public class BSPJobClient extends Config
       updateStatus();
       return status.getRunState();
     }
-    
+
     /**
      * Tells the service to terminate the current job.
      */
@@ -149,21 +156,21 @@ public class BSPJobClient extends Config
     @Override
     public void killTask(TaskAttemptID taskId, boolean shouldFail)
         throws IOException {
-      jobSubmitClient.killTask(taskId, shouldFail);      
-    }    
+      jobSubmitClient.killTask(taskId, shouldFail);
+    }
   }
-  
+
   private JobSubmissionProtocol jobSubmitClient = null;
   private Path sysDir = null;
   private FileSystem fs = null;
-  
+
   // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
+  final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+      .createImmutable((short) 0644); // rw-r--r--
 
   // job submission directory is world readable/writable/executable
-  final static FsPermission JOB_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
+  final static FsPermission JOB_DIR_PERMISSION = FsPermission
+      .createImmutable((short) 0777); // rwx-rwx-rwx
 
   public BSPJobClient(Configuration conf) throws IOException {
     setConf(conf);
@@ -171,27 +178,28 @@ public class BSPJobClient extends Config
   }
 
   public void init(Configuration conf) throws IOException {
-    // it will be used to determine if the bspmaster is running on local or not. 
+    // it will be used to determine if the bspmaster is running on local or not.
     String master = conf.get("bsp.master.address", "local");
     if ("local".equals(master)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient =  (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-          JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
-              conf, JobSubmissionProtocol.class));
+      this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
+          JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+          BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+              JobSubmissionProtocol.class));
     }
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
-  public synchronized void close() throws IOException {    
-      RPC.stopProxy(jobSubmitClient);
+  public synchronized void close() throws IOException {
+    RPC.stopProxy(jobSubmitClient);
   }
-  
+
   /**
-   * Get a filesystem handle.  We need this to prepare jobs
-   * for submission to the BSP system.
+   * Get a filesystem handle. We need this to prepare jobs for submission to the
+   * BSP system.
    * 
    * @return the filesystem handle.
    */
@@ -202,22 +210,22 @@ public class BSPJobClient extends Config
     }
     return fs;
   }
-  
-  private UnixUserGroupInformation getUGI(Configuration conf) throws IOException {
+
+  private UnixUserGroupInformation getUGI(Configuration conf)
+      throws IOException {
     UnixUserGroupInformation ugi = null;
     try {
       ugi = UnixUserGroupInformation.login(conf, true);
     } catch (LoginException e) {
-      throw (IOException)(new IOException(
+      throw (IOException) (new IOException(
           "Failed to get the current user's information.").initCause(e));
     }
     return ugi;
   }
-  
+
   /**
-   * Submit a job to the BSP system.
-   * This returns a handle to the {@link RunningJob} which can be used to track
-   * the running-job.
+   * Submit a job to the BSP system. This returns a handle to the
+   * {@link RunningJob} which can be used to track the running-job.
    * 
    * @param job the job configuration.
    * @return a handle to the {@link RunningJob} which can be used to track the
@@ -226,25 +234,24 @@ public class BSPJobClient extends Config
    * @throws IOException
    */
   public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
-                                                  IOException {    
-      return submitJobInternal(job);    
+      IOException {
+    return submitJobInternal(job);
   }
-  
-  public 
-  RunningJob submitJobInternal(BSPJob job) throws IOException {
+
+  public RunningJob submitJobInternal(BSPJob job) throws IOException {
     BSPJobID jobId = jobSubmitClient.getNewJobId();
     Path submitJobDir = new Path(getSystemDir(), jobId.toString());
-    Path submitJarFile = new Path(submitJobDir, "job.jar");    
+    Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitJobFile = new Path(submitJobDir, "job.xml");
-    
+
     LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir);
-    
+
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
      */
     UnixUserGroupInformation ugi = getUGI(job.getConf());
-    
+
     // Create a number of filenames in the BSPMaster's fs namespace
     FileSystem fs = getFs();
     fs.delete(submitJobDir, true);
@@ -253,25 +260,25 @@ public class BSPJobClient extends Config
     FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION);
     FileSystem.mkdirs(fs, submitJobDir, bspSysPerms);
     fs.mkdirs(submitJobDir);
-    short replication = (short)job.getInt("bsp.submit.replication", 10);
-    
+    short replication = (short) job.getInt("bsp.submit.replication", 10);
+
     String originalJarPath = job.getJar();
 
     if (originalJarPath != null) { // copy jar to BSPMaster's fs
-      // use jar name if job is not named. 
-      if ("".equals(job.getJobName())){
+      // use jar name if job is not named.
+      if ("".equals(job.getJobName())) {
         job.setJobName(new Path(originalJarPath).getName());
       }
       job.setJar(submitJarFile.toString());
       fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
-      
+
       fs.setReplication(submitJarFile, replication);
       fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
     } else {
-      LOG.warn("No job jar file set.  User classes may not be found. "+
-               "See BSPJob#setJar(String) or check Your jar file.");
+      LOG.warn("No job jar file set.  User classes may not be found. "
+          + "See BSPJob#setJar(String) or check Your jar file.");
     }
-    
+
     // Set the user's name and working directory
     job.setUser(ugi.getUserName());
     if (ugi.getGroupNames().length > 0) {
@@ -280,22 +287,22 @@ public class BSPJobClient extends Config
     if (job.getWorkingDirectory() == null) {
       job.setWorkingDirectory(fs.getWorkingDirectory());
     }
-    
-    // Write job file to BSPMaster's fs        
-    FSDataOutputStream out = 
-      FileSystem.create(fs, submitJobFile,
-                        new FsPermission(JOB_FILE_PERMISSION));
-    
+
+    // Write job file to BSPMaster's fs
+    FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+
     try {
       job.writeXml(out);
     } finally {
       out.close();
     }
-    
+
     //
     // Now, actually submit the job (using the submit name)
     //
-    JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
+    JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile
+        .toString());
     if (status != null) {
       return new NetworkedJob(status);
     } else {
@@ -304,7 +311,7 @@ public class BSPJobClient extends Config
   }
 
   /**
-   * Monitor a job and print status in real-time as progress is made and tasks 
+   * Monitor a job and print status in real-time as progress is made and tasks
    * fail.
    * 
    * @param job
@@ -313,29 +320,30 @@ public class BSPJobClient extends Config
    * @throws IOException
    * @throws InterruptedException
    */
-  public boolean monitorAndPrintJob (BSPJob job, RunningJob info) 
-    throws IOException, InterruptedException {
-    
+  public boolean monitorAndPrintJob(BSPJob job, RunningJob info)
+      throws IOException, InterruptedException {
+
     String lastReport = null;
     BSPJobID jobId = job.getJobID();
     LOG.info("Running job: " + jobId);
-    
+
     while (!job.isComplete()) {
       Thread.sleep(1000);
       String report = " bsp " + StringUtils.formatPercent(job.progress(), 0);
-      
+
       if (!report.equals(lastReport)) {
         LOG.info(report);
         lastReport = report;
-      }      
+      }
     }
-    
+
     LOG.info("Job complete: " + jobId);
     return job.isSuccessful();
   }
-  
+
   /**
-   * Grab the bspmaster system directory path where job-specific files are to be placed.
+   * Grab the bspmaster system directory path where job-specific files are to be
+   * placed.
    * 
    * @return the system directory where job-specific files are to be placed.
    */
@@ -346,20 +354,33 @@ public class BSPJobClient extends Config
     return sysDir;
   }
 
-  public static RunningJob runJob(BSPJob job) throws FileNotFoundException,
-  IOException {
+  public static void runJob(BSPJob job) throws FileNotFoundException,
+      IOException {
     BSPJobClient jc = new BSPJobClient(job.getConf());
-    return jc.submitJobInternal(job);    
+    RunningJob running = jc.submitJobInternal(job);
+    String jobId = running.getID().toString();
+    LOG.info("Running job: " + jobId);
+
+    while (true) {
+      if (running.isComplete()) {
+        LOG.info("Job complete: " + jobId);
+        break;
+      }
+    }
+
+    // TODO if error found, kill job
+    //running.killJob();
+    jc.close();
   }
-  
+
   /**
    * Get status information about the BSP cluster
    * 
    * @throws IOException
    */
   public ClusterStatus getClusterStatus() throws IOException {
-    // TODO: 
-    
+    // TODO:
+
     return null;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Oct  4 09:11:04 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -240,7 +241,7 @@ public class BSPMaster implements JobSub
     String hamaMasterStr = conf.get("bsp.master.address", "localhost");
     int defaultPort = conf.getInt("bsp.master.port", 40000);
 
-   return NetUtils.createSocketAddr(hamaMasterStr, defaultPort);
+    return NetUtils.createSocketAddr(hamaMasterStr, defaultPort);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -341,7 +342,7 @@ public class BSPMaster implements JobSub
   @Override
   public HeartbeatResponse heartbeat(GroomServerStatus status,
       boolean restarted, boolean initialContact, boolean acceptNewTasks,
-      short responseId) throws IOException {
+      short responseId, int reportSize) throws IOException {
 
     // First check if the last heartbeat response got through
     String groomName = status.getGroomName();
@@ -364,6 +365,8 @@ public class BSPMaster implements JobSub
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
     List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
 
+    updateTaskStatuses(status);
+
     // Check for new tasks to be executed on the groom server
     if (acceptNewTasks) {
       GroomServerStatus groomStatus = getGroomServer(groomName);
@@ -371,21 +374,21 @@ public class BSPMaster implements JobSub
         LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
       } else {
         List<Task> taskList = taskScheduler.assignTasks(groomStatus);
-        LOG.debug("BSPMaster.heartbeat.taskSize: " + taskList.size());
+
         for (Task task : taskList) {
           if (task != null) {
-            actions.add(new LaunchTaskAction(task));
+
+            if (!jobs.get(task.getJobID()).getStatus().isJobComplete()) {
+              if (jobs.get(task.getJobID()).getStatus().getRunState() != JobStatus.RUNNING) {
+                actions.add(new LaunchTaskAction(task));
+              }
+            }
+
           }
         }
       }
     }
 
-    // Check for tasks to be killed
-    List<GroomServerAction> killTasksList = getTasksToKill(groomName);
-    if (killTasksList != null) {
-      actions.addAll(killTasksList);
-    }
-
     response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
 
     groomToHeartbeatResponseMap.put(groomName, response);
@@ -394,6 +397,31 @@ public class BSPMaster implements JobSub
     return response;
   }
 
+  void updateTaskStatuses(GroomServerStatus status) {
+    for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
+      TaskStatus report = it.next();
+      report.setGroomServer(status.getGroomName());
+      String taskId = report.getTaskId();
+      TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+
+      if (tip == null) {
+        LOG.info("Serious problem.  While updating status, cannot find taskid "
+            + report.getTaskId());
+      } else {
+        JobInProgress job = tip.getJob();
+
+        if (report.getRunState() == TaskStatus.State.SUCCEEDED) {
+          job.completedTask(tip, report);
+        } else if (report.getRunState() == TaskStatus.State.FAILED) {
+          // Tell the job to fail the relevant task
+        } else {
+          job.updateTaskStatus(tip, report);
+        }
+      }
+
+    }
+  }
+
   // (trackerID -> TreeSet of completed taskids running at that tracker)
   TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
 
@@ -593,6 +621,7 @@ public class BSPMaster implements JobSub
 
   private synchronized void killJob(JobInProgress job) {
     LOG.info("Killing job " + job.getJobID());
+    job.kill();
   }
 
   @Override
@@ -626,21 +655,20 @@ public class BSPMaster implements JobSub
       TaskInProgress taskInProgress) {
     LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
         + ", for tracker '" + groomServer + "'");
-    /*
+
     // taskid --> tracker
-    taskidToTrackerMap.put(taskid, taskTracker);
+    taskidToTrackerMap.put(taskid, groomServer);
 
     // tracker --> taskid
-    TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+    TreeSet taskset = (TreeSet) trackerToTaskMap.get(groomServer);
     if (taskset == null) {
-        taskset = new TreeSet();
-        trackerToTaskMap.put(taskTracker, taskset);
+      taskset = new TreeSet();
+      trackerToTaskMap.put(groomServer, taskset);
     }
     taskset.add(taskid);
 
     // taskid --> TIP
-    taskidToTIPMap.put(taskid, tip);
-    */
+    taskidToTIPMap.put(taskid, taskInProgress);
   }
 
   public static void main(String[] args) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Oct  4 09:11:04 2010
@@ -170,6 +170,10 @@ public class BSPPeer implements Watcher,
         peer.put(messages.next());
       }
     }
+    
+    // Should we clearing outgoingQueues?
+    this.outgoingQueues.clear();
+    
     enterBarrier();
     Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
                                                // because
@@ -228,7 +232,7 @@ public class BSPPeer implements Watcher,
 
   @Override
   public void close() throws IOException {
-
+    server.stop();
   }
 
   @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Mon Oct  4 09:11:04 2010
@@ -7,7 +7,7 @@ public class BSPTask extends Task {
   private BSP bsp;
   private Configuration conf;
   
-  public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
+  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Oct  4 09:11:04 2010
@@ -82,10 +82,10 @@ public class GroomServer implements Runn
   // Job
   boolean acceptNewTasks = true;
   private int failures;
-  private int maxCurrentTasks;
+  private int maxCurrentTasks = 1;
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /** Map from taskId -> TaskInProgress. */
-  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+  Map<String, TaskInProgress> runningTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
@@ -120,7 +120,7 @@ public class GroomServer implements Runn
     // Clear out state tables
     this.tasks.clear();
     this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
-    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
     this.acceptNewTasks = true;
 
     this.groomServerName = "groomd_" + localHostname;
@@ -221,6 +221,8 @@ public class GroomServer implements Runn
             + ((actions != null) ? actions.length : 0) + " actions");
 
         if (actions != null) {
+          acceptNewTasks = false;
+          
           for (GroomServerAction action : actions) {
             if (action instanceof LaunchTaskAction) {
               startNewTask((LaunchTaskAction) action);
@@ -264,9 +266,10 @@ public class GroomServer implements Runn
   }
 
   private void startNewTask(LaunchTaskAction action) {
-    TaskInProgress tip = new TaskInProgress(action.getTask());
+    TaskInProgress tip = new TaskInProgress(action.getTask(), this.groomServerName);
     synchronized (tip) {
       try {
+        runningTasks.put(action.getTask().getTaskID(), tip);
         tip.launchTask();
       } catch (Throwable ie) {
         // TODO: when job failed.
@@ -293,7 +296,21 @@ public class GroomServer implements Runn
 
     // TODO - Later, acceptNewTask is to be set by the status of groom server.
     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
-        justStarted, justInited, acceptNewTasks, heartbeatResponseId);
+        justStarted, justInited, acceptNewTasks, heartbeatResponseId, status.getTaskReports().size());
+      
+    
+    synchronized (this) {
+      for (TaskStatus taskStatus : status.getTaskReports()) {
+        if(taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+          LOG.debug("Removing task from runningTasks: " + taskStatus.getTaskId());
+          runningTasks.remove(taskStatus.getTaskId());
+        }
+      }
+    }
+      
+    // Force a rebuild of 'status' on the next iteration
+    status = null;
+    
     return heartbeatResponse;
   }
 
@@ -387,18 +404,19 @@ public class GroomServer implements Runn
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
 
-    public TaskInProgress(Task task) {
+    public TaskInProgress(Task task, String groomServer) {
       this.task = task;
+      this.taskStatus = new TaskStatus(task.getTaskID(), 0, TaskStatus.State.UNASSIGNED, "running", groomServer, TaskStatus.Phase.STARTING);
     }
 
     static final String SUBDIR = "groomServer";
     
     public void launchTask() {
-      // until job is completed, don't accept new task
-      acceptNewTasks = false;
-
+      taskStatus.setRunState(TaskStatus.State.RUNNING);
+      
       try {
         // TODO: need to move this code to TaskRunner
+        
         task.getJobFile();
         conf.addResource(task.getJobFile());
         BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
@@ -408,14 +426,12 @@ public class GroomServer implements Runn
         Path localJarFile =
           defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.jar");
         
-        LOG.debug("localJobFile: "+ localJobFile);
-        
         systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
         systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", ".jar")), localJarFile);
 
         HamaConfiguration conf = new HamaConfiguration();
         conf.addResource(localJobFile);
-        BSPJob jobConf = new BSPJob(conf, task.getJobID());
+        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
         jobConf.setJar(localJarFile.toString());
         
         BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getBspClass(), conf);
@@ -425,9 +441,24 @@ public class GroomServer implements Runn
       } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
+      } finally {
+        
+        while (true) {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          
+          // If local/outgoing queues are empty, task is done.
+          if(bspPeer.localQueue.size() == 0 && bspPeer.outgoingQueues.size() == 0) {
+            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+            acceptNewTasks = true;
+            break;
+          }
+        }
       }
 
-      // TODO: report the task status
     }
 
     /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Mon Oct  4 09:11:04 2010
@@ -18,11 +18,14 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
 /*************************************************************
@@ -51,6 +54,7 @@ class JobInProgress {
   Path jobFile = null;
   Path localJobFile = null;
   Path localJarFile = null;
+  private LocalFileSystem localFs;
 
   long startTime;
   long launchTime;
@@ -58,14 +62,17 @@ class JobInProgress {
 
   // private LocalFileSystem localFs;
   private BSPJobID jobId;
-
   final BSPMaster master;
+  List<TaskInProgress> tasks;
 
   public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
       throws IOException {
     this.conf = conf;
     this.jobId = jobId;
 
+    this.tasks = new ArrayList<TaskInProgress>();
+    this.localFs = (LocalFileSystem) FileSystem.getNamed("local", conf);
+
     this.master = master;
     this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
@@ -76,10 +83,7 @@ class JobInProgress {
         + ".xml");
     this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
         + ".jar");
-    
-    LOG.debug("JobInProgress.localJobFile: " + this.localJobFile);
-    LOG.debug("JobInProgress.localJarFile: " + this.localJarFile);
-    
+
     Path jobDir = master.getSystemDirectoryForJob(jobId);
     FileSystem fs = jobDir.getFileSystem(conf);
     jobFile = new Path(jobDir, "job.xml");
@@ -93,6 +97,7 @@ class JobInProgress {
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
     }
+    
   }
 
   // ///////////////////////////////////////////////////
@@ -136,14 +141,83 @@ class JobInProgress {
   // ///////////////////////////////////////////////////
   public synchronized Task obtainNewTask(GroomServerStatus status,
       int clusterSize, int numUniqueHosts) {
+    LOG.debug("clusterSize: " + clusterSize);
+    
     Task result = null;
     try {
-      result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
-          numUniqueHosts).getTaskToRun(status);
+      TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile
+          .toString(), this.master, this.conf, this, numUniqueHosts);
+      tasks.add(tip);
+      result = tip.getTaskToRun(status);
     } catch (IOException e) {
       e.printStackTrace();
     }
 
     return result;
   }
+
+  public void completedTask(TaskInProgress tip, TaskStatus status) {
+    String taskid = status.getTaskId();
+    updateTaskStatus(tip, status);
+    LOG.info("Taskid '" + taskid + "' has finished successfully.");
+    tip.completed(taskid);
+
+    //
+    // If all tasks are complete, then the job is done!
+    //
+
+    boolean allDone = true;
+    for (TaskInProgress taskInProgress : tasks) {
+      if (!taskInProgress.isComplete()) {
+        allDone = false;
+        break;
+      }
+    }
+
+    this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
+        JobStatus.RUNNING);
+    
+    if(allDone) {
+      LOG.debug("Job successfully done.");
+      
+      this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
+          JobStatus.SUCCEEDED);
+      garbageCollect();        
+    }
+  }
+
+  public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
+    tip.updateStatus(status); // update tip
+  }
+
+  public void kill() {
+    // TODO Auto-generated method stub
+
+  }
+
+  /**
+   * The job is dead. We're now GC'ing it, getting rid of the job from all
+   * tables. Be sure to remove all of this job's tasks from the various tables.
+   */
+  synchronized void garbageCollect() {
+    try {
+      // Definitely remove the local-disk copy of the job file
+      if (localJobFile != null) {
+        localFs.delete(localJobFile, true);
+        localJobFile = null;
+      }
+      if (localJarFile != null) {
+        localFs.delete(localJarFile, true);
+        localJarFile = null;
+      }
+
+      // JobClient always creates a new directory with job files
+      // so we remove that directory to cleanup
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(new Path(profile.getJobFile()).getParent(), true);
+
+    } catch (IOException e) {
+      LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
+    }
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Oct  4 09:11:04 2010
@@ -155,7 +155,7 @@ public class LocalJobRunner implements J
 
           try {
             GroomServer servers = new GroomServer(conf);
-            Task task = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
             servers.assignTask(task);
             LOG.info("Adding task '" + tID.toString() + "' for '" + servers.getServerName() + "'");
             

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Oct  4 09:11:04 2010
@@ -39,7 +39,7 @@ class SimpleTaskScheduler extends TaskSc
         + (jobQueue.size() + 1) + ")");
     jobQueue.add(job);
   }
-  
+
   // removes job
   public void removeJob(JobInProgress job) {
     jobQueue.remove(job);
@@ -77,23 +77,26 @@ class SimpleTaskScheduler extends TaskSc
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
 
-          /*
-           * if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue;
-           * }
-           */
-
-          Task t = null;
-
-          t = job.obtainNewTask(groomStatus, numGroomServers,
-              groomServerManager.getNumberOfUniqueHosts());
-
-          if (t != null) {
-            assignedTasks.add(t);
-            break; // TODO - Now, simple scheduler assigns only one task to
-            // each groom. Later, it will be improved for scheduler to
-            // assign one or more tasks to each groom according to
-            // its capacity.
+          if (!job.getStatus().isJobComplete() && job.getStatus().getRunState() != JobStatus.RUNNING) {
+            /*
+             * if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+             * continue; }
+             */
+
+            Task t = null;
+
+            t = job.obtainNewTask(groomStatus, numGroomServers,
+                groomServerManager.getNumberOfUniqueHosts());
+
+            if (t != null) {
+              assignedTasks.add(t);
+              break; // TODO - Now, simple scheduler assigns only one task to
+              // each groom. Later, it will be improved for scheduler to
+              // assign one or more tasks to each groom according to
+              // its capacity.
+            }
           }
+          
         }
       }
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Mon Oct  4 09:11:04 2010
@@ -36,7 +36,7 @@ public class Task implements Writable {
   // Fields
   ////////////////////////////////////////////
   
-  protected String jobId;
+  protected BSPJobID jobId;
   protected String jobFile;
   protected String taskId;
   protected int partition;
@@ -44,10 +44,11 @@ public class Task implements Writable {
   protected LocalDirAllocator lDirAlloc;
 
   public Task() {
+    jobId = new BSPJobID();
     taskId = new String();
   }
   
-  public Task(String jobId, String jobFile, String taskId, int partition) {
+  public Task(BSPJobID jobId, String jobFile, String taskId, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskId;
@@ -73,7 +74,7 @@ public class Task implements Writable {
    * Get the job name for this task.
    * @return the job name
    */
-  public String getJobID() {
+  public BSPJobID getJobID() {
     return jobId;
   }
   
@@ -95,7 +96,7 @@ public class Task implements Writable {
   ////////////////////////////////////////////
   @Override
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, jobId);
+    jobId.write(out);
     Text.writeString(out, jobFile);
     Text.writeString(out, taskId);
     out.writeInt(partition);
@@ -103,7 +104,7 @@ public class Task implements Writable {
   
   @Override
   public void readFields(DataInput in) throws IOException {
-    jobId = Text.readString(in);
+    jobId.readFields(in);
     jobFile = Text.readString(in);
     taskId = Text.readString(in);
     partition = in.readInt();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Mon Oct  4 09:11:04 2010
@@ -68,7 +68,7 @@ class TaskInProgress {
   /**
    * Map from taskId -> TaskStatus
    */
-  private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+  private TreeMap<String, TaskStatus> taskStatuses = new TreeMap<String, TaskStatus>();
 
   private BSPJobID jobId;
 
@@ -92,7 +92,7 @@ class TaskInProgress {
       
       String taskid = null;
       if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
-        taskid = new String("task_" + nextTaskId);
+        taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
         ++nextTaskId;
       } else {
         LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) + 
@@ -100,8 +100,7 @@ class TaskInProgress {
         return null;
       }
 
-      //this.conf.set(Constants.PEER_PORT, String.valueOf(30000));
-      t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, this.conf);
+      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
       activeTasks.put(taskid, status.getGroomName());
 
       // Ask JobTracker to note that the task exists
@@ -127,9 +126,13 @@ class TaskInProgress {
   }
 
   public TaskID getTIPId() {
-    return this.id;
+    return id;
   }
 
+  public TreeMap<String, String> getTasks() {
+    return activeTasks;
+  }
+  
   /**
    * Is the Task associated with taskid is the first attempt of the tip?
    * 
@@ -171,4 +174,23 @@ class TaskInProgress {
         return false;
     }
   }
+
+  public void completed(String taskid) {
+    LOG.info("Task '" + taskid + "' has completed.");
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.SUCCEEDED);
+    activeTasks.remove(taskid);
+
+    //
+    // Now that the TIP is complete, the other speculative 
+    // subtasks will be closed when the owning tasktracker 
+    // reports in and calls shouldClose() on this object.
+    //
+
+    this.completes++;
+  }
+
+  public void updateStatus(TaskStatus status) {
+    taskStatuses.put(status.getTaskId(), status);
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Mon Oct  4 09:11:04 2010
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
-class TaskStatus implements Writable {
+class TaskStatus implements Writable, Cloneable {
   static final Log LOG = LogFactory.getLog(TaskStatus.class);
 
   // enumeration for reporting current phase of a task.
@@ -40,7 +40,7 @@ class TaskStatus implements Writable {
     RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
   }
 
-  private final TaskAttemptID taskId;
+  private String taskId;
   private float progress;
   private volatile State runState;
   private String stateString;
@@ -55,10 +55,10 @@ class TaskStatus implements Writable {
    * 
    */
   public TaskStatus() {
-    taskId = new TaskAttemptID();
+    taskId = new String();
   }
 
-  public TaskStatus(TaskAttemptID taskId, float progress, State runState,
+  public TaskStatus(String taskId, float progress, State runState,
       String stateString, String groomServer, Phase phase) {
     this.taskId = taskId;
     this.progress = progress;
@@ -72,7 +72,7 @@ class TaskStatus implements Writable {
   // Accessors and Modifiers
   // //////////////////////////////////////////////////
 
-  public TaskAttemptID getTaskId() {
+  public String getTaskId() {
     return taskId;
   }
 
@@ -225,7 +225,7 @@ class TaskStatus implements Writable {
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.taskId.readFields(in);
+    this.taskId = Text.readString(in);
     this.progress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.stateString = Text.readString(in);
@@ -236,7 +236,7 @@ class TaskStatus implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    taskId.write(out);
+    Text.writeString(out, taskId);
     out.writeFloat(progress);
     WritableUtils.writeEnum(out, runState);
     Text.writeString(out, stateString);

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java Mon Oct  4 09:11:04 2010
@@ -25,7 +25,7 @@ import org.apache.hama.bsp.HeartbeatResp
 public interface InterTrackerProtocol extends HamaRPCProtocolVersion {
   public HeartbeatResponse heartbeat(GroomServerStatus status, 
       boolean restarted, boolean initialContact, boolean acceptNewTasks,
-      short responseId) throws IOException;
+      short responseId, int reportSize) throws IOException;
 
   public String getSystemDir();
 }