You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/07/01 07:25:55 UTC

svn commit: r959515 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/ src/java/org/apache/hama/util/

Author: edwardyoon
Date: Thu Jul  1 05:25:54 2010
New Revision: 959515

URL: http://svn.apache.org/viewvc?rev=959515&view=rev
Log:
Add "serialize printing" to examples

Added:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jul  1 05:25:54 2010
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HAMA-279: Add "serialize printing" to examples (edwardyoon)
     HAMA-272: Hama/Zookeeper Integration (edwardyoon)
     HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
     HAMA-266: Add Ant target to generate the UML class diagrams (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Thu Jul  1 05:25:54 2010
@@ -29,7 +29,6 @@ import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.zookeeper.KeeperException;
-import org.mortbay.log.Log;
 
 public class PiEstimator {
 
@@ -54,8 +53,6 @@ public class PiEstimator {
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 
-      Log.info("Send a message (" + Bytes.toDouble(myData) + ") from "
-          + bspPeer.getServerName() + " to localhost:30000");
       bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
       bspPeer.sync();
 
@@ -88,7 +85,7 @@ public class PiEstimator {
     HamaConfiguration conf = new HamaConfiguration();
     // Execute locally
     conf.set("bsp.master.address", "local");
-
+    
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
     bsp.setJobName("pi estimation example");

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=959515&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Thu Jul  1 05:25:54 2010
@@ -0,0 +1,62 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+
+public class SerializePrinting {
+  
+  public static class HelloBSP extends BSP {
+    private Configuration conf;
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
+      int num = Integer.parseInt(conf.get("bsp.peers.num"));
+
+      for (int i = 0; i < num; i++) {
+        if (bspPeer.getId() == i) {
+          System.out.println("Hello BSP from " + i + " of " + num + ": "
+              + bspPeer.getServerName());
+        }
+
+        Thread.sleep(100);
+        bspPeer.sync();
+      }
+
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+  }
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+    // Execute locally
+    conf.set("bsp.master.address", "local");
+
+    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
+    // Set the job name
+    bsp.setJobName("serialize printing");
+    bsp.setBspClass(HelloBSP.class);
+
+    bsp.setNumBspTask(5);
+    BSPJobClient.runJob(bsp);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Jul  1 05:25:54 2010
@@ -139,4 +139,5 @@ public interface Constants {
   public static final String BLOCK = "block";
   
   public static final Text ROWCOUNT= new Text("row");
+  public static final String PEER_ID = "bsp.peer.id";
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Thu Jul  1 05:25:54 2010
@@ -19,7 +19,6 @@ package org.apache.hama.bsp;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import javax.security.auth.login.LoginException;
 
@@ -35,7 +34,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 
 public class BSPJobClient extends Configured {
@@ -183,17 +181,12 @@ public class BSPJobClient extends Config
     if ("local".equals(master)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+      this.jobSubmitClient =  (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+          JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
+              conf, JobSubmissionProtocol.class));
     }
   }
 
-  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
-      Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
-            conf, JobSubmissionProtocol.class));
-  }
-  
   /**
    * Close the <code>JobClient</code>.
    */
@@ -349,7 +342,7 @@ public class BSPJobClient extends Config
       job.set("group.name", ugi.getGroupNames()[0]);
     }
     if (job.getWorkingDirectory() == null) {
-      job.setWorkingDirectory(fs.getWorkingDirectory());          
+      job.setWorkingDirectory(fs.getWorkingDirectory());
     }
     
     // Write job file to BSPMaster's fs        

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Jul  1 05:25:54 2010
@@ -25,9 +25,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,7 @@ import org.apache.hama.ipc.JobSubmission
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
-public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol,
     GroomServerManager {
 
   static {
@@ -67,14 +72,15 @@ public class BSPMaster extends Thread im
   }
 
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
+  public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+  
   // States
   State state = State.INITIALIZING;
 
   // Attributes
   String masterIdentifier;
   private Server interTrackerServer;
-  
+
   // Filesystem
   static final String SUBDIR = "bspMaster";
   FileSystem fs = null;
@@ -100,10 +106,81 @@ public class BSPMaster extends Thread im
   private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
   private TaskScheduler taskScheduler;
 
-  /*
-   * private final List<JobInProgressListener> jobInProgressListeners = new
-   * CopyOnWriteArrayList<JobInProgressListener>();
-   */
+  ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+  Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
+      "expireLaunchingTasks");
+
+  private class ExpireLaunchingTasks implements Runnable {
+    private volatile boolean shouldRun = true;
+    private Map<String, Long> launchingTasks = new LinkedHashMap<String, Long>();
+
+    @Override
+    public void run() {
+      while (shouldRun) {
+        long now = System.currentTimeMillis();
+        
+        synchronized (BSPMaster.this) {
+          synchronized (launchingTasks) {
+            Iterator<Entry<String, Long>> itr = launchingTasks
+                .entrySet().iterator();
+            while (itr.hasNext()) {
+              Map.Entry<String, Long> pair = itr.next();
+              String taskId = pair.getKey(); 
+              long age = now - ((Long) pair.getValue()).longValue();
+              LOG.debug(taskId + " is " + age + " ms debug.");
+              
+              LOG.info(taskId);
+              if (age > GROOMSERVER_EXPIRY_INTERVAL) {
+                LOG.info("Launching task " + taskId + " timed out.");
+                TaskInProgress tip = null;
+                tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+                if (tip != null) {
+                  JobInProgress job = tip.getJob();
+                  String groomName = getAssignedTracker(taskId);
+                  GroomServerStatus trackerStatus = 
+                    getGroomServer(groomName);
+                  // This might happen when the tasktracker has already
+                  // expired and this thread tries to call failedtask
+                  // again. expire tasktracker should have called failed
+                  // task!
+                  if (trackerStatus != null) {
+                    /*
+                    job.failedTask(tip, taskId, "Error launching task", 
+                                   tip.isMapTask()? TaskStatus.Phase.MAP:
+                                     TaskStatus.Phase.STARTING,
+                                   trackerStatus.getHost(), trackerName,
+                                   myMetrics);
+                  */
+                  }
+                }
+                itr.remove();
+              } else {
+                // the tasks are sorted by start time, so once we find
+                // one that we want to keep, we are done for this cycle.
+                break;
+              }
+              
+            }
+          }
+        }
+      }
+    }
+
+    private String getAssignedTracker(String taskId) {
+      return taskidToTrackerMap.get(taskId);
+    }
+
+    public void addNewTask(String string) {
+      synchronized (launchingTasks) {
+        launchingTasks.put(string, new Long(System.currentTimeMillis()));
+      }
+    }
+    
+    public void stop() {
+      shouldRun = false;
+    }
+
+  }
 
   /**
    * Start the BSPMaster process, listen on the indicated hostname/port
@@ -116,6 +193,7 @@ public class BSPMaster extends Thread im
       InterruptedException {
     this.conf = conf;
     this.masterIdentifier = identifier;
+    //expireLaunchingTaskThread.start();
 
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass = conf.getClass(
@@ -228,8 +306,8 @@ public class BSPMaster extends Thread im
     return conf.getLocalPath("bsp.local.dir", pathString);
   }
 
-  public BSPMaster startMaster() throws IOException,
-      InterruptedException {
+  public static BSPMaster startMaster(HamaConfiguration conf)
+      throws IOException, InterruptedException {
     return startTracker(conf, generateNewIdentifier());
   }
 
@@ -238,6 +316,7 @@ public class BSPMaster extends Thread im
 
     BSPMaster result = null;
     result = new BSPMaster(conf, identifier);
+    result.taskScheduler.setGroomServerManager(result);
 
     return result;
   }
@@ -279,37 +358,6 @@ public class BSPMaster extends Thread im
     LOG.info("Stopped interTrackerServer");
   }
 
-  public static void main(String[] args) {
-    StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
-    if (args.length != 0) {
-      System.out.println("usage: HamaMaster");
-      System.exit(-1);
-    }
-
-    try {
-      HamaConfiguration conf = new HamaConfiguration();
-      conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local");
-
-      BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
-      master.start();
-    } catch (Throwable e) {
-      LOG.fatal(StringUtils.stringifyException(e));
-      System.exit(-1);
-    }
-  }
-
-  public void run() {
-    try {
-      offerService();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } 
-  }
-  
   // //////////////////////////////////////////////////
   // GroomServerManager
   // //////////////////////////////////////////////////
@@ -330,8 +378,7 @@ public class BSPMaster extends Thread im
 
   @Override
   public JobInProgress getJob(BSPJobID jobid) {
-    // TODO Auto-generated method stub
-    return null;
+    return jobs.get(jobid);
   }
 
   @Override
@@ -343,13 +390,12 @@ public class BSPMaster extends Thread im
   @Override
   public int getNumberOfUniqueHosts() {
     // TODO Auto-generated method stub
-    return 0;
+    return 1;
   }
 
   @Override
   public Collection<GroomServerStatus> grooms() {
-    // TODO Auto-generated method stub
-    return null;
+    return groomServers.values();
   }
 
   @Override
@@ -387,11 +433,6 @@ public class BSPMaster extends Thread im
   public HeartbeatResponse heartbeat(GroomServerStatus status,
       boolean restarted, boolean initialContact, boolean acceptNewTasks,
       short responseId) throws IOException {
-    LOG.debug(">>> Received the heartbeat message from ");
-    LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")");
-    LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact);
-    LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:"
-        + status.getTaskReports().size());
 
     // First check if the last heartbeat response got through
     String groomName = status.getGroomName();
@@ -412,7 +453,7 @@ public class BSPMaster extends Thread im
     }
 
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
-    // List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+    List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
 
     // Check for new tasks to be executed on the groom server
     if (acceptNewTasks) {
@@ -420,19 +461,102 @@ public class BSPMaster extends Thread im
       if (groomStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
       } else {
-        // TODO - assignTasks should be implemented
-        /*
-         * List<Task> tasks = taskScheduler.assignTasks(groomStatus); for(Task
-         * task : tasks) { if(tasks != null) { LOG.debug(groomName +
-         * "-> LaunchTask: " + task.getTaskID()); actions.add(new
-         * LaunchTaskAction(task)); } }
-         */
+        LOG.info(groomStatus);
+        List<Task> tasks = taskScheduler.assignTasks(groomStatus);
+        for (Task task : tasks) {
+          if (tasks != null) {
+            expireLaunchingTasks.addNewTask(task.getTaskID());
+            actions.add(new LaunchTaskAction(task));
+          }
+        }
       }
     }
 
+    // Check for tasks to be killed
+    List<GroomServerAction> killTasksList = getTasksToKill(groomName);
+    if (killTasksList != null) {
+      actions.addAll(killTasksList);
+    }
+
+    response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
+
+    groomToHeartbeatResponseMap.put(groomName, response);
+    removeMarkedTasks(groomName);
+    
     return response;
   }
 
+
+  // (trackerID -> TreeSet of completed taskids running at that tracker)
+  TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
+  
+  private void removeMarkedTasks(String groomName) {
+    // Purge all the 'marked' tasks which were running at taskTracker
+    TreeSet<String> markedTaskSet = 
+      (TreeSet<String>) trackerToMarkedTasksMap.get(groomName);
+    if (markedTaskSet != null) {
+      for (String taskid : markedTaskSet) {
+        removeTaskEntry(taskid);
+        LOG.info("Removed completed task '" + taskid + "' from '" + 
+            groomName + "'");
+      }
+      // Clear 
+      trackerToMarkedTasksMap.remove(groomName);
+    }
+  }
+
+  private void removeTaskEntry(String taskid) {
+    // taskid --> tracker
+    String tracker = taskidToTrackerMap.remove(taskid);
+
+    // tracker --> taskid
+    if (tracker != null) {
+        TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+        if (trackerSet != null) {
+            trackerSet.remove(taskid);
+        }
+    }
+
+    // taskid --> TIP
+    taskidToTIPMap.remove(taskid);
+    
+    LOG.debug("Removing task '" + taskid + "'");
+  }
+
+  private List<GroomServerAction> getTasksToKill(String groomName) {
+    Set<String> taskIds = (TreeSet<String>) trackerToTaskMap.get(groomName);
+    if (taskIds != null) {
+        List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
+        Set<String> killJobIds = new TreeSet<String>(); 
+        for (String killTaskId : taskIds ) {
+            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+            if (tip.shouldCloseForClosedJob(killTaskId)) {
+                // 
+                // This is how the JobTracker ends a task at the TaskTracker.
+                // It may be successfully completed, or may be killed in
+                // mid-execution.
+                //
+                if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
+                    killList.add(new KillTaskAction(killTaskId));
+                    LOG.debug(groomName + " -> KillTaskAction: " + killTaskId);
+                } else {
+                    String killJobId = tip.getJob().getStatus().getJobID().getJtIdentifier(); 
+                    killJobIds.add(killJobId);
+                }
+            }
+        }
+        
+        for (String killJobId : killJobIds) {
+            killList.add(new KillJobAction(killJobId));
+            LOG.debug(groomName + " -> KillJobAction: " + killJobId);
+        }
+
+        return killList;
+    }
+    return null;
+
+  }
+
   /**
    * Process incoming heartbeat messages from the groom.
    */
@@ -463,17 +587,15 @@ public class BSPMaster extends Thread im
   }
 
   @Override
-  public JobStatus submitJob(BSPJobID jobId) throws IOException {
-    LOG.info("Submitted a job (" + jobId + ")");
-    if (jobs.containsKey(jobId)) {
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    if (jobs.containsKey(jobID)) {
       // job already running, don't start twice
-      LOG.info("The job (" + jobId + ") is already subbmitted");
-      return jobs.get(jobId).getStatus();
+      LOG.info("The job (" + jobID + ") is already subbmitted");
+      return jobs.get(jobID).getStatus();
     }
 
-    JobInProgress job = new JobInProgress(jobId, this, this.conf);
-
-    return addJob(jobId, job);
+    JobInProgress job = new JobInProgress(jobID, this, this.conf);
+    return addJob(jobID, job);
   }
 
   @Override
@@ -502,7 +624,6 @@ public class BSPMaster extends Thread im
       jobs.put(job.getProfile().getJobID(), job);
       taskScheduler.addJob(job);
     }
-
     return job.getStatus();
   }
 
@@ -589,9 +710,56 @@ public class BSPMaster extends Thread im
     this.interTrackerServer.stop();
   }
 
-  @Override
-  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
+  TreeMap<String, String> taskidToTrackerMap = new TreeMap<String, String>();
+  TreeMap<String, TreeSet<String>> trackerToTaskMap = new TreeMap<String, TreeSet<String>>();
+  Map<String, TaskInProgress> taskidToTIPMap = new TreeMap<String, TaskInProgress>();
+  
+  public void createTaskEntry(String taskid, String groomServer,
+      TaskInProgress taskInProgress) {
+    LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId() + ", for tracker '" + groomServer + "'");
+    /*
+    // taskid --> groom
+    taskidToTrackerMap.put(taskid, groomServer);
+    // groom --> taskid
+    TreeSet<String> taskset = null;
+    if(trackerToTaskMap.entrySet().size() > 0) {
+      taskset = trackerToTaskMap.get(groomServer);
+      LOG.info(taskset.size());
+      LOG.info(taskset.size());
+      LOG.info(taskset.size());
+    }
+    
+    if (taskset == null) {
+        taskset = new TreeSet<String>();
+        trackerToTaskMap.put(groomServer, taskset);
+    }
+    taskset.add(taskid);
+    taskidToTIPMap.put(taskid, taskInProgress);
+    
+    LOG.info("" + taskidToTrackerMap);
+    LOG.info("" + taskidToTIPMap);
+    */
+  }
+
+  public static void main(String[] args) {
+    StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+    if (args.length != 0) {
+      System.out.println("usage: HamaMaster");
+      System.exit(-1);
+    }
+
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      conf.addResource(new Path(
+          "/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+      conf.addResource(new Path(
+          "/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+      BSPMaster master = startMaster(conf);
+      master.offerService();
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Thu Jul  1 05:25:54 2010
@@ -58,7 +58,8 @@ public class BSPPeer implements Watcher,
   protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
   protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-
+  protected int id;
+ 
   /**
    * 
    */
@@ -69,6 +70,7 @@ public class BSPPeer implements Watcher,
         + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
     bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    id = conf.getInt(Constants.PEER_ID, 0);
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     zookeeperAddr = conf.get(Constants.ZOOKEEPER_SERVER_ADDRS,
@@ -242,4 +244,9 @@ public class BSPPeer implements Watcher,
   public String getServerName() {
     return this.serverName;
   }
+  
+  public int getId() {
+    return this.id;  
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Thu Jul  1 05:25:54 2010
@@ -13,7 +13,8 @@ public class BSPRunner extends Thread im
   private BSPPeer bspPeer;
   private Configuration conf;
   private BSP bsp;
-
+  private boolean isDone;
+  
   public void run() {
     try {
       bsp.bsp(bspPeer);
@@ -39,4 +40,8 @@ public class BSPRunner extends Thread im
     bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
         BSP.class), conf);
   }
+  
+  public boolean isDone() {
+    return this.isDone;
+  }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=959515&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Thu Jul  1 05:25:54 2010
@@ -0,0 +1,17 @@
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPTask extends Task {
+  
+  public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.taskId = taskid;
+    this.partition = partition;
+    this.runner = (BSPRunner) ReflectionUtils.newInstance(
+        BSPRunner.class, conf);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Thu Jul  1 05:25:54 2010
@@ -27,6 +27,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,6 +88,9 @@ public class GroomServer implements Runn
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
+  private BlockingQueue<GroomServerAction> tasksToCleanup = 
+    new LinkedBlockingQueue<GroomServerAction>();
+  
   public GroomServer(Configuration conf) throws IOException {
     this.conf = conf;
     bspMasterAddr = BSPMaster.getAddress(conf);
@@ -203,6 +208,23 @@ public class GroomServer implements Runn
 
         // Send the heartbeat and process the bspmaster's directives
         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+        
+        GroomServerAction[] actions = heartbeatResponse.getActions();
+        LOG.info("Got heartbeatResponse from BSPMaster with responseId: " + 
+            heartbeatResponse.getResponseId() + " and " + 
+            ((actions != null) ? actions.length : 0) + " actions");
+
+        
+        if (actions != null){ 
+          for(GroomServerAction action: actions) {
+            if (action instanceof LaunchTaskAction) {
+              startNewTask((LaunchTaskAction) action);
+            } else {
+              tasksToCleanup.put(action);
+            }
+          }
+        }
+        
         //
         // The heartbeat got through successfully!
         //
@@ -236,8 +258,17 @@ public class GroomServer implements Runn
     return State.NORMAL;
   }
 
-  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+  private void startNewTask(LaunchTaskAction action) {
+    // TODO Auto-generated method stub
+    Task t = action.getTask();
+    LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+    LOG.info(t.runner);
+    //t.runner.start();
+    // TODO: execute task
     
+  }
+
+  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // 
     // Check if the last heartbeat got through... 
     // if so then build the heartbeat information for the BSPMaster;
@@ -273,7 +304,9 @@ public class GroomServer implements Runn
       initialize();
       startCleanupThreads();
       boolean denied = false;
+      LOG.info("Why? " + running + ", " + shuttingDown + ", " + denied);
       while (running && !shuttingDown && !denied) {
+     
         boolean staleState = false;
         try {
           while (running && !staleState && !shuttingDown && !denied) {
@@ -326,27 +359,6 @@ public class GroomServer implements Runn
     RPC.stopProxy(jobClient);
   }
 
-  public static void main(String[] args) {
-    StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
-    if (args.length != 0) {
-      System.out.println("usage: GroomServer");
-      System.exit(-1);
-    }
-
-    try {
-      Configuration conf = new HamaConfiguration();
-      conf.set("bsp.master.port", "40000");
-      conf.set("bsp.groom.port", "40020");
-      conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
-      conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
-      GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
-      startGroomServer(groom);
-    } catch (Throwable e) {
-      LOG.fatal(StringUtils.stringifyException(e));
-      System.exit(-1);
-    }
-  }
-  
   public static Thread startGroomServer(final GroomServer hrs) {
     return startGroomServer(hrs,
       "regionserver" + hrs.groomServerName);
@@ -425,4 +437,25 @@ public class GroomServer implements Runn
         "Master: " + groomServerClass.toString(), e);
     }
   }
+
+  public static void main(String[] args) {
+    StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+    if (args.length != 0) {
+      System.out.println("usage: GroomServer");
+      System.exit(-1);
+    }
+
+    try {
+      Configuration conf = new HamaConfiguration();
+      conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+      conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+      
+      GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
+      startGroomServer(groom);
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+  
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Thu Jul  1 05:25:54 2010
@@ -144,7 +144,7 @@ public class GroomServerStatus implement
     Text.writeString(out, host);
     out.writeInt(failures);
     out.writeInt(maxTasks);
-    out.writeInt(taskReports.size());    
+    out.writeInt(taskReports.size());
     for(TaskStatus taskStatus : taskReports) {
       taskStatus.write(out);
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Thu Jul  1 05:25:54 2010
@@ -45,6 +45,7 @@ class JobInProgress {
 
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
 
+  Configuration conf;
   JobProfile profile;
   JobStatus status;
   Path jobFile = null;
@@ -62,6 +63,7 @@ class JobInProgress {
 
   public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
       throws IOException {
+    this.conf = conf;
     this.jobId = jobId;
 
     this.master = master;
@@ -80,10 +82,10 @@ class JobInProgress {
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJobContext job = new BSPJobContext(localJobFile, jobId);
 
-    System.out.println("user:" + job.getUser());
-    System.out.println("jobId:" + jobId);
-    System.out.println("jobFile:" + jobFile.toString());
-    System.out.println("jobName:" + job.getJobName());
+    LOG.info("user:" + job.getUser());
+    LOG.info("jobId:" + jobId);
+    LOG.info("jobFile:" + jobFile.toString());
+    LOG.info("jobName:" + job.getJobName());
 
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
         .getJobName());
@@ -135,7 +137,16 @@ class JobInProgress {
   // ///////////////////////////////////////////////////
   public synchronized Task obtainNewTask(GroomServerStatus status,
       int clusterSize, int numUniqueHosts) {
+    Task result = null;
+    try {
+      result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
+          numUniqueHosts).getTaskToRun(status);
+      LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
+      
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
 
-    return null;
+    return result;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java Thu Jul  1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.Text;
+
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
@@ -28,30 +30,30 @@ import java.io.IOException;
  * 
  */
 class KillJobAction extends GroomServerAction {
-  final BSPJobID jobId;
+  String jobId;
 
   public KillJobAction() {
     super(ActionType.KILL_JOB);
-    jobId = new BSPJobID();
+    jobId = new String();
   }
 
-  public KillJobAction(BSPJobID jobId) {
+  public KillJobAction(String killJobId) {
     super(ActionType.KILL_JOB);
-    this.jobId = jobId;
+    this.jobId = killJobId;
   }
 
-  public BSPJobID getJobID() {
+  public String getJobID() {
     return jobId;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    jobId.write(out);
+    Text.writeString(out, jobId);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    jobId.readFields(in);
+    jobId = Text.readString(in);
   }
 
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Thu Jul  1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.Text;
+
 
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
@@ -28,29 +30,29 @@ import java.io.IOException;
  * 
  */
 class KillTaskAction extends GroomServerAction {
-  final TaskAttemptID taskId;
+  String taskId;
   
   public KillTaskAction() {
     super(ActionType.KILL_TASK);
-    taskId = new TaskAttemptID();
+    taskId = new String();
   }
   
-  public KillTaskAction(TaskAttemptID taskId) {
+  public KillTaskAction(String killTaskId) {
     super(ActionType.KILL_TASK);
-    this.taskId = taskId;
+    this.taskId = killTaskId;
   }
 
-  public TaskAttemptID getTaskID() {
+  public String getTaskID() {
     return taskId;
   }
   
   @Override
   public void write(DataOutput out) throws IOException {
-    taskId.write(out);
+    Text.writeString(out, taskId);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    taskId.readFields(in);
+    taskId = Text.readString(in);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Thu Jul  1 05:25:54 2010
@@ -9,7 +9,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.InterTrackerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
@@ -102,12 +101,6 @@ public class LocalJobRunner implements J
     }
   }
 
-  @Override
-  public JobStatus submitJob(BSPJobID jobName) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   /**
    * Local Job
    */
@@ -116,16 +109,17 @@ public class LocalJobRunner implements J
     private Configuration conf;
     private int NUM_PEER;
     private BSPJob job;
+    private String jobFile;
     private boolean threadDone = false;
-    private HashMap<String, BSPRunner> tasks = new HashMap<String, BSPRunner>();
+    private HashMap<String, Task> tasks = new HashMap<String, Task>();
 
     public Job(BSPJobID jobID, String jobFile, Configuration conf)
         throws IOException {
       this.conf = conf;
+      this.jobFile = jobFile;
       this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
       LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
       this.job = new BSPJob(jobID, jobFile);
-      LOG.info("Jar file: " + job.getJar());
       LOG.info("Number of BSP tasks: " + NUM_PEER);
       jobs.put(jobID.toString(), this);
 
@@ -158,17 +152,21 @@ public class LocalJobRunner implements J
         TaskID tID;
         for (int i = 0; i < NUM_PEER; i++) {
           this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
-          BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
-              BSPRunner.class, this.conf);
+          this.conf.setInt(Constants.PEER_ID, i);
           tID = new TaskID(job.getJobID(), false, i);
-          tasks.put(tID.toString(), runner);
+
+          Task bspRunner = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+          LOG.info("Adding task '" + tID.toString() + "' for '" + bspRunner.getName() + "'");
+          tasks.put(tID.toString(), bspRunner);
         }
 
-        for (Map.Entry<String, BSPRunner> e : tasks.entrySet()) {
-          e.getValue().start();
+        // Launching tasks
+        for (Map.Entry<String, Task> e : tasks.entrySet()) {
+          e.getValue().runner.start();
         }
 
-        for (Map.Entry<String, BSPRunner> e : tasks.entrySet()) {
+        // Barrier
+        for (Map.Entry<String, Task> e : tasks.entrySet()) {
           try {
             e.getValue().join();
           } catch (InterruptedException e1) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Jul  1 05:25:54 2010
@@ -54,13 +54,11 @@ class SimpleTaskScheduler extends TaskSc
   public List<Task> assignTasks(GroomServerStatus groomStatus)
       throws IOException {
     ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
+    
     final int numGroomServers = clusterStatus.getGroomServers();
     // final int clusterTaskCapacity = clusterStatus.getMaxTasks();
 
-    //
     // Get task counts for the current groom.
-    //
     // final int groomTaskCapacity = groom.getMaxTasks();
     final int groomRunningTasks = groomStatus.countTasks();
 
@@ -73,14 +71,19 @@ class SimpleTaskScheduler extends TaskSc
       // instance to the scheduler.
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
+          /*
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
           }
+          */
 
           Task t = null;
 
           t = job.obtainNewTask(groomStatus, numGroomServers,
               groomServerManager.getNumberOfUniqueHosts());
+          
+          LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+          
           if (t != null) {
             assignedTasks.add(t);
             break; // TODO - Now, simple scheduler assigns only one task to

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Thu Jul  1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -28,23 +30,28 @@ import org.apache.hadoop.io.Writable;
 /**
  *
  */
-public class Task implements Writable {
+public class Task extends Thread implements Writable {
+  public static final Log LOG = LogFactory.getLog(Task.class);
   ////////////////////////////////////////////
   // Fields
   ////////////////////////////////////////////
-  private String jobFile;
-  private TaskAttemptID taskId;
-  private int partition;
   
+  protected String jobId;
+  protected String jobFile;
+  protected String taskId;
+  protected int partition;
+  
+  protected BSPRunner runner;
   protected LocalDirAllocator lDirAlloc;
   /**
    * 
    */
   public Task() {
-    taskId = new TaskAttemptID();
+    taskId = new String();
   }
   
-  public Task(String jobFile, TaskAttemptID taskId, int partition) {
+  public Task(String jobId, String jobFile, String taskId, int partition) {
+    this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskId;
      
@@ -62,7 +69,7 @@ public class Task implements Writable {
     return jobFile; 
   }
   
-  public TaskAttemptID getTaskID() {
+  public String getTaskID() {
     return taskId;
   }
   
@@ -70,8 +77,8 @@ public class Task implements Writable {
    * Get the job name for this task.
    * @return the job name
    */
-  public BSPJobID getJobID() {
-    return taskId.getJobID();
+  public String getJobID() {
+    return jobId;
   }
   
   /**
@@ -92,15 +99,18 @@ public class Task implements Writable {
   ////////////////////////////////////////////
   @Override
   public void write(DataOutput out) throws IOException {
+    Text.writeString(out, jobId);
     Text.writeString(out, jobFile);
-    taskId.write(out);
+    Text.writeString(out, taskId);
     out.writeInt(partition);
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
+    jobId = Text.readString(in);
     jobFile = Text.readString(in);
-    taskId.readFields(in);
+    taskId = Text.readString(in);
     partition = in.readInt();
   }
+  
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Thu Jul  1 05:25:54 2010
@@ -17,10 +17,15 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hama.Constants;
 
 /**
  *
@@ -28,7 +33,7 @@ import org.apache.commons.logging.LogFac
 class TaskInProgress {
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
-  private BSPJobContext context;
+  private Configuration conf;
 
   // Constants
   static final int MAX_TASK_EXECS = 1;
@@ -58,7 +63,7 @@ class TaskInProgress {
 
   // Map from task Id -> GroomServer Id, contains tasks that are
   // currently runnings
-  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+  private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   // All attempt Ids of this TIP
   // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
   /**
@@ -66,15 +71,45 @@ class TaskInProgress {
    */
   private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
 
+  private BSPJobID jobId;
+
   public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
-      BSPJobContext context, JobInProgress job, int partition) {
+      Configuration conf, JobInProgress job, int partition) {
+    this.jobId = jobId;
     this.jobFile = jobFile;
     this.bspMaster = master;
     this.job = job;
-    this.context = context;
+    this.conf = conf;
     this.partition = partition;
+    
+    this.id = new TaskID(jobId, true, partition);
   }
 
+  /**
+   * Return a Task that can be sent to a GroomServer for execution.
+   */
+  public Task getTaskToRun(GroomServerStatus status) throws IOException {
+      Task t = null;
+      
+      String taskid = null;
+      if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+        taskid = new String("task_" + nextTaskId);
+        ++nextTaskId;
+      } else {
+        LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) + 
+                " attempts for the tip '" + getTIPId() + "'");
+        return null;
+      }
+
+      //this.conf.set(Constants.PEER_PORT, String.valueOf(30000));
+      t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, this.conf);
+      activeTasks.put(taskid, status.getGroomName());
+
+      // Ask JobTracker to note that the task exists
+      bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
+      return t;
+  }
+  
   // //////////////////////////////////
   // Accessors
   // //////////////////////////////////
@@ -123,4 +158,18 @@ class TaskInProgress {
   public synchronized boolean isComplete() {
     return (completes > 0);
   }
+
+  private TreeSet tasksReportedClosed = new TreeSet();
+  
+  public boolean shouldCloseForClosedJob(String taskid) {
+    TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+    if ((ts != null) &&
+        (! tasksReportedClosed.contains(taskid)) &&
+        (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+        tasksReportedClosed.add(taskid);
+        return true;
+    }  else {
+        return false;
+    }
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java Thu Jul  1 05:25:54 2010
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.mortbay.log.Log;
 
 /**
  * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
@@ -43,6 +44,7 @@ abstract class TaskScheduler implements 
 
   public synchronized void setGroomServerManager(
       GroomServerManager groomServerManager) {
+    Log.info("TaskScheduler.setGroomServermanager()");
     this.groomServerManager = groomServerManager;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Thu Jul  1 05:25:54 2010
@@ -44,11 +44,14 @@ public interface JobSubmissionProtocol e
    * that job. 
    * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    *
-   * @param jobName
+   * @param jobID
+   * @param jobFile
    * @return jobStatus
    * @throws IOException
    */
-  public JobStatus submitJob(BSPJobID jobName) throws IOException;
+  //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
   
   /**
    * Get the current status of the cluster
@@ -104,7 +107,5 @@ public interface JobSubmissionProtocol e
    */ 
   public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
 
-
-  JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
   
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java Thu Jul  1 05:25:54 2010
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.GroomServer;
 import org.apache.log4j.Logger;
@@ -76,7 +77,7 @@ public class ClusterUtil {
   public static String startup(final BSPMaster m,
       final List<ClusterUtil.GroomServerThread> groomservers, Configuration conf) throws IOException, InterruptedException {
     if (m != null) {
-      m.start();
+      m.startMaster((HamaConfiguration) conf);
     }
 
     if (groomservers != null) {