You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/01/12 13:32:42 UTC

svn commit: r1058109 [1/2] - in /incubator/hama/trunk: ./ conf/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/ src/test/org/apache/hama/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Wed Jan 12 12:32:39 2011
New Revision: 1058109

URL: http://svn.apache.org/viewvc?rev=1058109&view=rev
Log:
Modify MniCluster so that developers can benefit when testing using Junit

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/build.xml
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan 12 12:32:39 2011
@@ -51,6 +51,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
+    HAMA-346: Modify MniCluster so that developers can benefit when testing using Junit
+                       (ChiaHung Lin via edwardyoon)
     HAMA-340: Implementation of job submit command (edwardyoon)   
     HAMA-278: Few minor refactoring (edwardyoon)
     HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID

Modified: incubator/hama/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/build.xml?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/build.xml (original)
+++ incubator/hama/trunk/build.xml Wed Jan 12 12:32:39 2011
@@ -257,6 +257,10 @@
 	<!-- Run unit tests                                                     -->
 	<!-- ================================================================== -->
 	<path id="test.classpath">
+		<fileset dir="${lib.dir}/jsp-2.1/">
+			<include name="*.jar" />
+		</fileset>
+		<pathelement location="${conf.dir}" />
 		<pathelement location="${src.test}" />
 		<pathelement location="${build.test}" />
 		<path refid="classpath" />
@@ -281,9 +285,12 @@
 		<junit printsummary="yes" showoutput="${test.output}" haltonfailure="no" fork="yes" maxmemory="512m" errorProperty="tests.failed" failureProperty="tests.failed" timeout="${test.timeout}">
 			<classpath refid="test.classpath" />
 			<formatter type="${test.junit.output.format}" />
-			<batchtest todir="${build.report.tests}">
+			<batchtest todir="${build.report.tests}" unless="testcase">
 				<fileset dir="${src.test}" includes="**/Test*.java" excludes="**/${test.exclude}.java" />
 			</batchtest>
+			<batchtest todir="${build.report.tests}" if="testcase">
+				<fileset dir="${src.test}" includes="**/${testcase}.java" excludes="**/${test.exclude}.java" />
+			</batchtest>
 		</junit>
 		<fail if="tests.failed">Tests failed!</fail>
 	</target>

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Wed Jan 12 12:32:39 2011
@@ -41,6 +41,11 @@
     <description>The port an groom server binds to.</description>
   </property>
   <property>
+    <name>bsp.groom.rpc.port</name>
+     <value>50000</value>
+    <description>The port an groom rpc binds to.</description>
+  </property>
+  <property>
     <name>bsp.local.dir</name>
     <value>${hadoop.tmp.dir}/bsp/local</value>
     <description>local directory for temporal store</description> 

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Wed Jan 12 12:32:39 2011
@@ -23,6 +23,16 @@ package org.apache.hama;
  * Some constants used in the Hama
  */
 public interface Constants {
+ 
+  public  static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
+
+  public static final String DEFAULT_GROOM_RPC_HOST = "0.0.0.0";
+
+  public static final String GROOM_RPC_PORT = "bsp.groom.rpc.port";
+
+  /** Default port region rpc server listens on. */
+  public static final int DEFAULT_GROOM_RPC_PORT = 50000;
+  
 
   ///////////////////////////////////////
   // Constants for BSP Package

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Wed Jan 12 12:32:39 2011
@@ -42,14 +42,14 @@ public class BSPJob extends BSPJobContex
     super(conf, null);
     jobClient = new BSPJobClient(conf);
   }
-  
+
   public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
     this(conf);
     setJobName(jobName);
   }
 
   public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
-     super(new Path(jobFile), jobID);
+    super(new Path(jobFile), jobID);
   }
 
   /**
@@ -65,7 +65,7 @@ public class BSPJob extends BSPJobContex
     super(conf, null);
     setNumBspTask(tasks);
   }
-  
+
   @SuppressWarnings("unchecked")
   public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
     this(conf);
@@ -78,10 +78,10 @@ public class BSPJob extends BSPJobContex
           + " instead of " + state);
     }
   }
-  
-  ///////////////////////////////////////
+
+  // /////////////////////////////////////
   // Setter for Job Submission
-  ///////////////////////////////////////
+  // /////////////////////////////////////
   public void setWorkingDirectory(Path dir) throws IOException {
     ensureState(JobState.DEFINE);
     dir = new Path(getWorkingDirectory(), dir);
@@ -99,7 +99,7 @@ public class BSPJob extends BSPJobContex
     ensureState(JobState.DEFINE);
     conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
   }
-  
+
   @SuppressWarnings("unchecked")
   public Class<? extends BSP> getBspClass() {
     return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
@@ -115,7 +115,7 @@ public class BSPJob extends BSPJobContex
       conf.set("bsp.jar", jar);
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   private static String findContainingJar(Class my_class) {
     ClassLoader loader = my_class.getClassLoader();
@@ -144,18 +144,18 @@ public class BSPJob extends BSPJobContex
     ensureState(JobState.DEFINE);
     conf.set("bsp.job.name", name);
   }
-  
+
   public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) {
-        
+
   }
 
   public void setUser(String user) {
     conf.set("user.name", user);
   }
-  
-  ///////////////////////////////////////
+
+  // /////////////////////////////////////
   // Methods for Job Control
-  ///////////////////////////////////////
+  // /////////////////////////////////////
   public float progress() throws IOException {
     ensureState(JobState.RUNNING);
     return info.progress();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Wed Jan 12 12:32:39 2011
@@ -23,16 +23,14 @@ import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,14 +44,15 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterServerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.WorkerProtocol;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
-public class BSPMaster implements JobSubmissionProtocol, InterServerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, // InterServerProtocol,
     GroomServerManager {
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
 
@@ -73,7 +72,8 @@ public class BSPMaster implements JobSub
 
   // Attributes
   String masterIdentifier;
-  private Server interServer;
+  // private Server interServer;
+  private Server masterServer;
 
   // Filesystem
   static final String SUBDIR = "bspMaster";
@@ -86,28 +86,21 @@ public class BSPMaster implements JobSub
   final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission
       .createImmutable((short) 0700); // rwx------
 
-  // Groom Servers
-  // (groom name --> last sent HeartBeatResponse)
-  Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
-  private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
-  // maps groom server names to peer names
-  private HashMap<String, String> groomServerPeers = new HashMap<String, String>();
-
   // Jobs' Meta Data
   private Integer nextJobId = Integer.valueOf(1);
   // private long startTime;
-  private int totalSubmissions = 0;
-  private int totalTasks = 0;
-  private int totalTaskCapacity;
+  private int totalSubmissions = 0; // how many jobs has been submitted by
+  // clients
+  private int totalTasks = 0; // currnetly running tasks
+  private int totalTaskCapacity; // max tasks that groom server can run
+
   private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
   private TaskScheduler taskScheduler;
 
-  TreeMap<TaskAttemptID, String> taskIdToGroomNameMap = new TreeMap<TaskAttemptID, String>();
-  TreeMap<String, TreeSet<TaskAttemptID>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
-  Map<TaskAttemptID, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<TaskAttemptID, TaskInProgress>();
+  // GroomServers cache
+  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
 
-  Vector<JobInProgress> jobInitQueue = new Vector<JobInProgress>();
-  JobInitThread initJobs = new JobInitThread();
+  private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
 
   /**
    * Start the BSPMaster process, listen on the indicated hostname/port
@@ -123,16 +116,17 @@ public class BSPMaster implements JobSub
     this.masterIdentifier = identifier;
     // expireLaunchingTaskThread.start();
 
-    // Create the scheduler
+    // Create the scheduler and init scheduler services
     Class<? extends TaskScheduler> schedulerClass = conf.getClass(
         "bsp.master.taskscheduler", SimpleTaskScheduler.class,
         TaskScheduler.class);
     this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(
         schedulerClass, conf);
 
-    InetSocketAddress addr = getAddress(conf);
-    this.interServer = RPC.getServer(this, addr.getHostName(), addr
-        .getPort(), conf);
+    String host = getAddress(conf).getHostName();
+    int port = getAddress(conf).getPort();
+    LOG.info("RPC BSPMaster: host " + host + " port " + port);
+    this.masterServer = RPC.getServer(this, host, port, conf);
 
     while (!Thread.currentThread().isInterrupted()) {
       try {
@@ -175,63 +169,125 @@ public class BSPMaster implements JobSub
     deleteLocalFiles(SUBDIR);
   }
 
-  // /////////////////////////////////////////////////////
-  // Accessors for objects that want info on jobs, tasks,
-  // grooms, etc.
-  // /////////////////////////////////////////////////////
-  public GroomServerStatus getGroomServer(String groomID) {
-    synchronized (groomServers) {
-      return groomServers.get(groomID);
+  /**
+   * A GroomServer registers with its status to BSPMaster when startup, which
+   * will update GroomServers cache.
+   * 
+   * @param status to be updated in cache.
+   * @return true if registering successfully; false if fail.
+   */
+  @Override
+  public boolean register(GroomServerStatus status) throws IOException {
+    if (null == status) {
+      LOG.error("No groom server status.");
+      throw new NullPointerException("No groom server status.");
     }
+    Throwable e = null;
+    try {
+      WorkerProtocol wc = (WorkerProtocol) RPC.waitForProxy(
+          WorkerProtocol.class, WorkerProtocol.versionID,
+          resolveWorkerAddress(status.getRpcServer()), this.conf);
+      if (null == wc) {
+        LOG
+            .warn("Fail to create Worker client at host "
+                + status.getPeerName());
+        return false;
+      }
+      // TODO: need to check if peer name has changed
+      groomServers.putIfAbsent(status, wc);
+    } catch (UnsupportedOperationException u) {
+      e = u;
+    } catch (ClassCastException c) {
+      e = c;
+    } catch (NullPointerException n) {
+      e = n;
+    } catch (IllegalArgumentException i) {
+      e = i;
+    } catch (Exception ex) {
+      e = ex;
+    }
+
+    if (null != e) {
+      LOG.error("Fail to register GroomServer " + status.getGroomName(), e);
+      return false;
+    }
+
+    return true;
   }
 
-  public List<String> groomServerNames() {
-    List<String> activeGrooms = new ArrayList<String>();
+  private static InetSocketAddress resolveWorkerAddress(String data) {
+    return new InetSocketAddress(data.split(":")[0], Integer.parseInt(data
+        .split(":")[1]));
+  }
+
+  private void updateGroomServersKey(GroomServerStatus old,
+      GroomServerStatus newKey) {
     synchronized (groomServers) {
-      for (GroomServerStatus status : groomServers.values()) {
-        activeGrooms.add(status.getGroomName());
-      }
+      WorkerProtocol worker = groomServers.remove(old);
+      groomServers.put(newKey, worker);
     }
-    return activeGrooms;
   }
 
-  // ///////////////////////////////////////////////////////////////
-  // Used to init new jobs that have just been created
-  // ///////////////////////////////////////////////////////////////
-  class JobInitThread implements Runnable {
-    private volatile boolean shouldRun = true;
-
-    public JobInitThread() {
-    }
-
-    public void run() {
-      while (shouldRun) {
-        JobInProgress job = null;
-        synchronized (jobInitQueue) {
-          if (jobInitQueue.size() > 0) {
-            job = (JobInProgress) jobInitQueue.elementAt(0);
-            jobInitQueue.remove(job);
-          } else {
-            try {
-              jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
-            } catch (InterruptedException iex) {
-            }
-          }
+  @Override
+  public boolean report(Directive directive) throws IOException {
+    // check returned directive type if equals response
+    if (directive.getType().value() != Directive.Type.Response.value()) {
+      throw new IllegalStateException("GroomServer should report()"
+          + " with Response. Current report type:" + directive.getType());
+    }
+    // update GroomServerStatus hold in groomServers cache.
+    GroomServerStatus fstus = directive.getStatus();
+    // groomServers cache contains groom server status reported back
+    if (groomServers.containsKey(fstus)) {
+      GroomServerStatus ustus = null;
+      for (GroomServerStatus old : groomServers.keySet()) {
+        if (old.equals(fstus)) {
+          ustus = fstus;
+          updateGroomServersKey(old, ustus);
+          break;
         }
-        try {
-          if (job != null) {
-            job.initTasks();
+      }// for
+      if (null != ustus) {
+        List<TaskStatus> tlist = ustus.getTaskReports();
+        for (TaskStatus ts : tlist) {
+          JobInProgress jip = whichJob(ts.getJobId());
+          // TODO: need for each tip execute completed?
+          // each tip already maintain a data structure, checking
+          // if task status is completed
+          TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
+              .getTaskId()).getTaskID());
+          jip.completedTask(tip, ts);
+          LOG.info("JobInProgress id:" + jip.getJobID() + " status:"
+              + jip.getStatus());
+          if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+            for (JobInProgressListener listener : jobInProgressListeners) {
+              try {
+                listener.jobRemoved(jip);
+              } catch (IOException ioe) {
+                LOG.error("Fail to alter scheduler a job is moved.", ioe);
+              }
+            }
           }
-        } catch (Exception e) {
-          LOG.warn("job init failed: " + e);
-          job.kill();
         }
+      } else {
+        throw new RuntimeException("BSPMaster contains GroomServerSatus, "
+            + "but fail to retrieve it.");
       }
+    } else {
+      throw new RuntimeException("GroomServer not found."
+          + fstus.getGroomName());
     }
+    return true;
+  }
 
-    public void stopIniter() {
-      shouldRun = false;
+  private JobInProgress whichJob(BSPJobID id) {
+    for (JobInProgress job : taskScheduler
+        .getJobs(SimpleTaskScheduler.PROCESSING_QUEUE)) {
+      if (job.getJobID().equals(id)) {
+        return job;
+      }
     }
+    return null;
   }
 
   // /////////////////////////////////////////////////////////////
@@ -283,6 +339,7 @@ public class BSPMaster implements JobSub
 
     BSPMaster result = new BSPMaster(conf, identifier);
     result.taskScheduler.setGroomServerManager(result);
+    result.taskScheduler.start();
 
     return result;
   }
@@ -295,26 +352,27 @@ public class BSPMaster implements JobSub
   }
 
   /**
+   * BSPMaster identifier
    * 
-   * @return
+   * @return String BSPMaster identification number
    */
   private static String generateNewIdentifier() {
     return new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
   }
 
   public void offerService() throws InterruptedException, IOException {
-    new Thread(this.initJobs).start();
-    LOG.info("Starting jobInitThread");
-
-    this.interServer.start();
+    // this.interServer.start();
+    this.masterServer.start();
 
     synchronized (this) {
       state = State.RUNNING;
     }
     LOG.info("Starting RUNNING");
 
-    this.interServer.join();
-    LOG.info("Stopped interServer");
+    // this.interServer.join();
+    this.masterServer.join();
+
+    LOG.info("Stopped RPC Master server.");
   }
 
   // //////////////////////////////////////////////////
@@ -323,8 +381,8 @@ public class BSPMaster implements JobSub
   @Override
   public long getProtocolVersion(String protocol, long clientVersion)
       throws IOException {
-    if (protocol.equals(InterServerProtocol.class.getName())) {
-      return InterServerProtocol.versionID;
+    if (protocol.equals(MasterProtocol.class.getName())) {
+      return MasterProtocol.versionID;
     } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
       return JobSubmissionProtocol.versionID;
     } else {
@@ -332,185 +390,6 @@ public class BSPMaster implements JobSub
     }
   }
 
-  /**
-   * A RPC method for transmitting each peer status from peer to master.
-   * 
-   * @throws IOException
-   */
-  @Override
-  public HeartbeatResponse heartbeat(GroomServerStatus status,
-      boolean restarted, boolean initialContact, boolean acceptNewTasks,
-      short responseId, int reportSize) throws IOException {
-
-    // First check if the last heartbeat response got through
-    String groomName = status.getGroomName();
-    long now = System.currentTimeMillis();
-
-    HeartbeatResponse prevHeartbeatResponse = groomToHeartbeatResponseMap
-        .get(groomName);
-
-    // Process this heartbeat
-    short newResponseId = (short) (responseId + 1);
-    status.setLastSeen(now);
-    if (!processHeartbeat(status, initialContact)) {
-      if (prevHeartbeatResponse != null) {
-        groomToHeartbeatResponseMap.remove(groomName);
-      }
-      return new HeartbeatResponse(newResponseId,
-          new GroomServerAction[] { new ReinitGroomAction() }, Collections
-              .<String, String> emptyMap());
-    }
-
-    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null,
-        groomServerPeers);
-    List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
-
-    // Check for new tasks to be executed on the groom server
-    if (acceptNewTasks) {
-      GroomServerStatus groomStatus = getGroomServer(groomName);
-      if (groomStatus == null) {
-        LOG.warn("Unknown groom server polling; ignoring: " + groomName);
-      } else {
-        List<Task> taskList = taskScheduler.assignTasks(groomStatus);
-
-        for (Task task : taskList) {
-          if (task != null) {
-            actions.add(new LaunchTaskAction(task));
-          }
-        }
-      }
-    }
-
-    response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
-
-    groomToHeartbeatResponseMap.put(groomName, response);
-    removeMarkedTasks(groomName);
-    updateTaskStatuses(status);
-
-    return response;
-  }
-
-  void updateTaskStatuses(GroomServerStatus status) {
-    for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
-      TaskStatus report = it.next();
-      report.setGroomServer(status.getGroomName());
-      TaskAttemptID taskId = report.getTaskId();
-      TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
-          .get(taskId);
-
-      if (tip == null) {
-        LOG.info("Serious problem.  While updating status, cannot find taskid "
-            + report.getTaskId());
-      } else {
-        JobInProgress job = tip.getJob();
-
-        if (report.getRunState() == TaskStatus.State.SUCCEEDED) {
-          job.completedTask(tip, report);
-        } else if (report.getRunState() == TaskStatus.State.FAILED) {
-          // TODO Tell the job to fail the relevant task
-
-        } else {
-          job.updateTaskStatus(tip, report);
-        }
-      }
-
-    }
-  }
-
-  // (trackerID -> TreeSet of completed taskids running at that tracker)
-  TreeMap<String, TreeSet<TaskAttemptID>> trackerToMarkedTasksMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
-
-  private void removeMarkedTasks(String groomName) {
-    // Purge all the 'marked' tasks which were running at groomServer
-    TreeSet<TaskAttemptID> markedTaskSet = trackerToMarkedTasksMap
-        .get(groomName);
-    if (markedTaskSet != null) {
-      for (TaskAttemptID taskid : markedTaskSet) {
-        removeTaskEntry(taskid);
-        LOG.info("Removed completed task '" + taskid + "' from '" + groomName
-            + "'");
-      }
-      // Clear
-      trackerToMarkedTasksMap.remove(groomName);
-    }
-  }
-
-  private void removeTaskEntry(TaskAttemptID taskid) {
-    // taskid --> groom
-    String groom = taskIdToGroomNameMap.remove(taskid);
-
-    // groom --> taskid
-    if (groom != null) {
-      TreeSet<TaskAttemptID> groomSet = groomNameToTaskIdsMap.get(groom);
-      if (groomSet != null) {
-        groomSet.remove(taskid);
-      }
-    }
-
-    // taskid --> TIP
-    taskIdToTaskInProgressMap.remove(taskid);
-    LOG.debug("Removing task '" + taskid + "'");
-  }
-
-  private List<GroomServerAction> getTasksToKill(String groomName) {
-    Set<TaskAttemptID> taskIds = groomNameToTaskIdsMap.get(groomName);
-    if (taskIds != null) {
-      List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
-      Set<String> killJobIds = new TreeSet<String>();
-      for (TaskAttemptID killTaskId : taskIds) {
-        TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
-            .get(killTaskId);
-        if (tip.shouldCloseForClosedJob(killTaskId)) {
-          // 
-          // This is how the BSPMaster ends a task at the GroomServer.
-          // It may be successfully completed, or may be killed in
-          // mid-execution.
-          //
-          if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
-            killList.add(new KillTaskAction(killTaskId));
-            LOG.debug(groomName + " -> KillTaskAction: " + killTaskId);
-          } else {
-            String killJobId = tip.getJob().getStatus().getJobID()
-                .getJtIdentifier();
-            killJobIds.add(killJobId);
-          }
-        }
-      }
-
-      for (String killJobId : killJobIds) {
-        killList.add(new KillJobAction(killJobId));
-        LOG.debug(groomName + " -> KillJobAction: " + killJobId);
-      }
-
-      return killList;
-    }
-    return null;
-
-  }
-
-  /**
-   * Process incoming heartbeat messages from the groom.
-   */
-  private synchronized boolean processHeartbeat(GroomServerStatus groomStatus,
-      boolean initialContact) {
-    String groomName = groomStatus.getGroomName();
-
-    synchronized (groomServers) {
-      GroomServerStatus oldStatus = groomServers.get(groomName);
-      if (oldStatus == null) {
-        groomServers.put(groomName, groomStatus);
-      } else { // TODO - to be improved to update status.
-      }
-    }
-
-    if (initialContact) {
-      groomServerPeers.put(groomStatus.getGroomName(), groomStatus
-          .getPeerName());
-    }
-
-    return true;
-  }
-
   // //////////////////////////////////////////////////
   // JobSubmissionProtocol
   // //////////////////////////////////////////////////
@@ -535,23 +414,29 @@ public class BSPMaster implements JobSub
       return jobs.get(jobID).getStatus();
     }
 
-    JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this, this.conf);
+    JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
+        this.conf);
     return addJob(jobID, job);
   }
 
+  // //////////////////////////////////////////////////
+  // GroomServerManager functions
+  // //////////////////////////////////////////////////
+
   @Override
   public ClusterStatus getClusterStatus(boolean detailed) {
-    int numGroomServers;
     Map<String, String> groomPeersMap = null;
 
     // give the caller a snapshot of the cluster status
-    synchronized (this) {
-      numGroomServers = groomServerPeers.size();
-      if (detailed) {
-        groomPeersMap = new HashMap<String, String>(groomServerPeers);
+    int numGroomServers = groomServers.size();
+    if (detailed) {
+      groomPeersMap = new HashMap<String, String>();
+      for (Map.Entry<GroomServerStatus, WorkerProtocol> entry : groomServers
+          .entrySet()) {
+        GroomServerStatus s = entry.getKey();
+        groomPeersMap.put(s.getGroomName(), s.getPeerName());
       }
     }
-
     if (detailed) {
       return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
           state);
@@ -561,23 +446,58 @@ public class BSPMaster implements JobSub
     }
   }
 
+  @Override
+  public WorkerProtocol findGroomServer(GroomServerStatus status) {
+    return groomServers.get(status);
+  }
+
+  @Override
+  public Collection<WorkerProtocol> findGroomServers() {
+    return groomServers.values();
+  }
+
+  @Override
+  public Collection<GroomServerStatus> groomServerStatusKeySet() {
+    return groomServers.keySet();
+  }
+
+  @Override
+  public void addJobInProgressListener(JobInProgressListener listener) {
+    jobInProgressListeners.add(listener);
+  }
+
+  @Override
+  public void removeJobInProgressListener(JobInProgressListener listener) {
+    jobInProgressListeners.remove(listener);
+  }
+
+  @Override
+  public Map<String, String> currentGroomServerPeers() {
+    Map<String, String> tmp = new HashMap<String, String>();
+    for (GroomServerStatus status : groomServers.keySet()) {
+      tmp.put(status.getGroomName(), status.getPeerName());
+    }
+    return tmp;
+  }
+
   /**
    * Adds a job to the bsp master. Make sure that the checks are inplace before
    * adding a job. This is the core job submission logic
    * 
    * @param jobId The id for the job submitted which needs to be added
    */
-  private synchronized JobStatus addJob(BSPJobID jodId, JobInProgress job) {
+  private synchronized JobStatus addJob(BSPJobID jobId, JobInProgress job) {
     totalSubmissions++;
     synchronized (jobs) {
-      synchronized (jobInitQueue) {
-        jobs.put(job.getProfile().getJobID(), job);
-        taskScheduler.addJob(job);
-        jobInitQueue.add(job);
-        jobInitQueue.notifyAll();
+      jobs.put(job.getProfile().getJobID(), job);
+      for (JobInProgressListener listener : jobInProgressListeners) {
+        try {
+          listener.jobAdded(job);
+        } catch (IOException ioe) {
+          LOG.error("Fail to alter Scheduler a job is added.", ioe);
+        }
       }
     }
-
     return job.getStatus();
   }
 
@@ -600,11 +520,11 @@ public class BSPMaster implements JobSub
     List<JobStatus> jobStatusList = new ArrayList<JobStatus>();
     for (JobInProgress jip : jips) {
       JobStatus status = jip.getStatus();
-      
+
       status.setStartTime(jip.getStartTime());
       // Sets the user name
       status.setUsername(jip.getProfile().getUser());
-      
+
       if (toComplete) {
         if (status.getRunState() == JobStatus.RUNNING
             || status.getRunState() == JobStatus.PREP) {
@@ -694,26 +614,10 @@ public class BSPMaster implements JobSub
   }
 
   public void shutdown() {
-    this.interServer.stop();
+    this.masterServer.stop();
   }
 
-  public void createTaskEntry(TaskAttemptID taskid, String groomServer,
-      TaskInProgress taskInProgress) {
-    LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
-        + ", for groom '" + groomServer + "'");
-
-    // taskid --> groom
-    taskIdToGroomNameMap.put(taskid, groomServer);
-
-    // groom --> taskid
-    TreeSet<TaskAttemptID> taskset = groomNameToTaskIdsMap.get(groomServer);
-    if (taskset == null) {
-      taskset = new TreeSet<TaskAttemptID>();
-      groomNameToTaskIdsMap.put(groomServer, taskset);
-    }
-    taskset.add(taskid);
-
-    // taskid --> TIP
-    taskIdToTaskInProgressMap.put(taskid, taskInProgress);
+  public BSPMaster.State currentState() {
+    return this.state;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Jan 12 12:32:39 2011
@@ -56,9 +56,12 @@ public class BSPPeer implements Watcher,
   private final String bspRoot;
   private final String zookeeperAddr;
 
-  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
-  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = 
+    new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = 
+    new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private final ConcurrentLinkedQueue<BSPMessage> localQueue = 
+    new ConcurrentLinkedQueue<BSPMessage>();
   private Set<String> allPeerNames = new HashSet<String>();
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
@@ -68,7 +71,6 @@ public class BSPPeer implements Watcher,
    */
   public BSPPeer(Configuration conf) throws IOException {
     this.conf = conf;
-
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
@@ -79,6 +81,8 @@ public class BSPPeer implements Watcher,
         + ":"
         + conf.getInt(Constants.ZOOKEPER_CLIENT_PORT,
             Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    // TODO: may require to dynamic reflect the underlying 
+    //       network e.g. ip address, port.
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
     reinitialize();
@@ -90,6 +94,7 @@ public class BSPPeer implements Watcher,
       server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
           .getPort(), conf);
       server.start();
+      LOG.info(" BSPPeer address:"+peerAddress.getHostName()+" port:"+peerAddress.getPort());
     } catch (IOException e) {
       e.printStackTrace();
     }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ * 
+ */
+public class Directive implements Writable {
+
+  public static final Log LOG = LogFactory.getLog(Directive.class);
+
+  private long timestamp;
+  private Directive.Type type;
+  private Map<String, String> groomServerPeers;
+  private GroomServerAction[] actions;
+  private GroomServerStatus status;
+
+  public static enum Type {
+    Request(1), Response(2);
+    int t;
+
+    Type(int t) {
+      this.t = t;
+    }
+
+    public int value() {
+      return this.t;
+    }
+  };
+
+  public Directive() {
+    this.timestamp = System.currentTimeMillis();
+  }
+
+  public Directive(Map<String, String> groomServerPeers,
+      GroomServerAction[] actions) {
+    this();
+    this.type = Directive.Type.Request;
+    this.groomServerPeers = groomServerPeers;
+    this.actions = actions;
+  }
+
+  public Directive(GroomServerStatus status) {
+    this();
+    this.type = Directive.Type.Response;
+    this.status = status;
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public Directive.Type getType() {
+    return this.type;
+  }
+
+  public Map<String, String> getGroomServerPeers() {
+    return this.groomServerPeers;
+  }
+
+  public GroomServerAction[] getActions() {
+    return this.actions;
+  }
+
+  public GroomServerStatus getStatus() {
+    return this.status;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(this.timestamp);
+    out.writeInt(this.type.value());
+    if (getType().value() == Directive.Type.Request.value()) {
+      if (this.actions == null) {
+        WritableUtils.writeVInt(out, 0);
+      } else {
+        WritableUtils.writeVInt(out, actions.length);
+        for (GroomServerAction action : this.actions) {
+          WritableUtils.writeEnum(out, action.getActionType());
+          action.write(out);
+        }
+      }
+      String[] groomServerNames = groomServerPeers.keySet().toArray(
+          new String[0]);
+      WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+      List<String> groomServerAddresses = new ArrayList<String>(
+          groomServerNames.length);
+      for (String groomName : groomServerNames) {
+        groomServerAddresses.add(groomServerPeers.get(groomName));
+      }
+      WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+          .toArray(new String[0]));
+    } else if (getType().value() == Directive.Type.Response.value()) {
+      this.status.write(out);
+    } else {
+      throw new IllegalStateException("Wrong directive type:" + getType());
+    }
+
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.timestamp = in.readLong();
+    int t = in.readInt();
+    if (Directive.Type.Request.value() == t) {
+      this.type = Directive.Type.Request;
+      int length = WritableUtils.readVInt(in);
+      if (length > 0) {
+        this.actions = new GroomServerAction[length];
+        for (int i = 0; i < length; ++i) {
+          GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+              GroomServerAction.ActionType.class);
+          actions[i] = GroomServerAction.createAction(actionType);
+          actions[i].readFields(in);
+        }
+      } else {
+        this.actions = null;
+      }
+      String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+      String[] groomServerAddresses = WritableUtils
+          .readCompressedStringArray(in);
+      groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+      for (int i = 0; i < groomServerNames.length; i++) {
+        groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+      }
+    } else if (Directive.Type.Response.value() == t) {
+      this.type = Directive.Type.Response;
+      this.status = new GroomServerStatus();
+      this.status.readFields(in);
+    } else {
+      throw new IllegalStateException("Wrong directive type:" + t);
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+  public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+  private final String name;
+  private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+  public FCFSQueue(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public void addJob(JobInProgress job) {
+    try {
+      queue.put(job);
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+    }
+  }
+
+  @Override
+  public void removeJob(JobInProgress job) {
+    queue.remove(job);
+  }
+
+  @Override
+  public JobInProgress removeJob() {
+    try {
+      return queue.take();
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+    }
+    return null;
+  }
+
+  @Override
+  public Collection<JobInProgress> jobs() {
+    return queue;
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Jan 12 12:32:39 2011
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.RunJar;
@@ -47,13 +49,17 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterServerProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.WorkerProtocol;
+
+public class GroomServer implements Runnable, WorkerProtocol {
 
-public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
-  private static BSPPeer bspPeer;
+  private BSPPeer bspPeer;
   static final String SUBDIR = "groomServer";
 
+  private volatile static int REPORT_INTERVAL = 60 * 1000;
+
   Configuration conf;
 
   // Constants
@@ -62,19 +68,16 @@ public class GroomServer implements Runn
   };
 
   // Running States and its related things
+  volatile boolean initialized = false;
   volatile boolean running = true;
   volatile boolean shuttingDown = false;
-  boolean justStarted = true;
   boolean justInited = true;
   GroomServerStatus status = null;
-  short heartbeatResponseId = -1;
-  private volatile int heartbeatInterval = 3 * 1000;
 
   // Attributes
   String groomServerName;
   String localHostname;
   InetSocketAddress bspMasterAddr;
-  InterServerProtocol jobClient;
 
   // Filesystem
   // private LocalDirAllocator localDirAllocator;
@@ -82,7 +85,6 @@ public class GroomServer implements Runn
   FileSystem systemFS = null;
 
   // Job
-  boolean acceptNewTasks = true;
   private int failures;
   private int maxCurrentTasks = 1;
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -90,6 +92,14 @@ public class GroomServer implements Runn
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
+  // new nexus between GroomServer and BSPMaster
+  // holds/ manage all tasks
+  List<TaskInProgress> tasksList = new CopyOnWriteArrayList<TaskInProgress>();
+
+  private String rpcServer;
+  private Server workerServer;
+  MasterProtocol masterClient;
+
   private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
 
   public GroomServer(Configuration conf) throws IOException {
@@ -114,7 +124,6 @@ public class GroomServer implements Runn
       this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
           "default"), conf.get("bsp.dns.nameserver", "default"));
     }
-
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
     deleteLocalFiles("groomserver");
@@ -123,20 +132,75 @@ public class GroomServer implements Runn
     this.tasks.clear();
     this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
     this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
-    this.acceptNewTasks = true;
-
     this.conf.set(Constants.PEER_HOST, localHostname);
+    this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     bspPeer = new BSPPeer(conf);
 
+    int rpcPort = -1;
+    String rpcAddr = null;
+    if (false == this.initialized) {
+      rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
+          Constants.DEFAULT_GROOM_RPC_HOST);
+      rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
+          Constants.DEFAULT_GROOM_RPC_PORT);
+      if (-1 == rpcPort || null == rpcAddr)
+        throw new IllegalArgumentException("Error rpc address " + rpcAddr
+            + " port" + rpcPort);
+      this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
+      this.workerServer.start();
+      this.rpcServer = rpcAddr + ":" + rpcPort;
+      LOG.info("Worker rpc server --> " + rpcServer);
+    }
+
     this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
     LOG.info("Starting groom: " + this.groomServerName);
 
     DistributedCache.purgeCache(this.conf);
 
-    this.jobClient = (InterServerProtocol) RPC.waitForProxy(
-        InterServerProtocol.class, InterServerProtocol.versionID,
-        bspMasterAddr, conf);
+    // establish the communication link to bsp master
+    this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
+        MasterProtocol.versionID, bspMasterAddr, conf);
+
+    // enroll in bsp master
+    if (-1 == rpcPort || null == rpcAddr)
+      throw new IllegalArgumentException("Error rpc address " + rpcAddr
+          + " port" + rpcPort);
+    if (!this.masterClient.register(new GroomServerStatus(groomServerName,
+        bspPeer.getPeerName(), cloneAndResetRunningTaskStatuses(), failures,
+        maxCurrentTasks, this.rpcServer))) {
+      LOG.error("There is a problem in establishing communication"
+          + " link with BSPMaster");
+      throw new IOException("There is a problem in establishing"
+          + " communication link with BSPMaster.");
+    }
     this.running = true;
+    this.initialized = true;
+  }
+
+  @Override
+  public void dispatch(Directive directive) throws IOException {
+    // update tasks status
+    GroomServerAction[] actions = directive.getActions();
+    bspPeer.setAllPeerNames(directive.getGroomServerPeers().values());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got Response from BSPMaster with "
+          + ((actions != null) ? actions.length : 0) + " actions");
+    }
+    // perform actions
+    if (actions != null) {
+      for (GroomServerAction action : actions) {
+        if (action instanceof LaunchTaskAction) {
+          startNewTask((LaunchTaskAction) action);
+        } else {
+          try {
+            tasksToCleanup.put(action);
+          } catch (InterruptedException e) {
+            LOG.error("Fail to move action to cleanup list.");
+            e.printStackTrace();
+          }
+        }
+      }
+    }
   }
 
   private static void checkLocalDirs(String[] localDirs)
@@ -193,70 +257,19 @@ public class GroomServer implements Runn
   }
 
   public State offerService() throws Exception {
-    long lastHeartbeat = 0;
 
     while (running && !shuttingDown) {
       try {
-        long now = System.currentTimeMillis();
-
-        long waitTime = heartbeatInterval - (now - lastHeartbeat);
-        if (waitTime > 0) {
-          // sleeps for the wait time
-          Thread.sleep(waitTime);
-        }
-
         if (justInited) {
-          String dir = jobClient.getSystemDir();
+          String dir = masterClient.getSystemDir();
           if (dir == null) {
-            throw new IOException("Failed to get system directory");
+            LOG.error("Fail to get system directory.");
+            throw new IOException("Fail to get system directory.");
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(conf);
         }
-
-        // Send the heartbeat and process the bspmaster's directives
-        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
-
-        if (acceptNewTasks) {
-          bspPeer.setAllPeerNames(heartbeatResponse.getGroomServers().values());
-        }
-
-        for (String peer : bspPeer.getAllPeerNames()) {
-          LOG.debug("Remote peer, host:port is " + peer);
-        }
-
-        GroomServerAction[] actions = heartbeatResponse.getActions();
-        LOG.debug("Got heartbeatResponse from BSPMaster with responseId: "
-            + heartbeatResponse.getResponseId() + " and "
-            + ((actions != null) ? actions.length : 0) + " actions");
-
-        if (actions != null) {
-          acceptNewTasks = false;
-
-          for (GroomServerAction action : actions) {
-            if (action instanceof LaunchTaskAction) {
-              startNewTask((LaunchTaskAction) action);
-            } else {
-              tasksToCleanup.put(action);
-            }
-          }
-        }
-
-        //
-        // The heartbeat got through successfully!
-        //
-        heartbeatResponseId = heartbeatResponse.getResponseId();
-
-        // Note the time when the heartbeat returned, use this to decide when to
-        // send the
-        // next heartbeat
-        lastHeartbeat = System.currentTimeMillis();
-
-        justStarted = false;
         justInited = false;
-      } catch (InterruptedException ie) {
-        LOG.info("Interrupted. Closing down.");
-        return State.INTERRUPTED;
       } catch (DiskErrorException de) {
         String msg = "Exiting groom server for disk error:\n"
             + StringUtils.stringifyException(de);
@@ -271,7 +284,6 @@ public class GroomServer implements Runn
         LOG.error(msg);
       }
     }
-
     return State.NORMAL;
   }
 
@@ -296,7 +308,6 @@ public class GroomServer implements Runn
     Task task = tip.getTask();
     conf.addResource(task.getJobFile());
     BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
-
     Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
         + task.getTaskID() + "/" + "job.xml");
 
@@ -392,43 +403,6 @@ public class GroomServer implements Runn
     }
   }
 
-  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
-    // 
-    // Check if the last heartbeat got through...
-    // if so then build the heartbeat information for the BSPMaster;
-    // else resend the previous status information.
-    //
-    if (status == null) {
-      synchronized (this) {
-        status = new GroomServerStatus(groomServerName, bspPeer.getPeerName(),
-            cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
-      }
-    } else {
-      LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName()
-          + "' with reponseId '" + heartbeatResponseId + "'");
-    }
-
-    // TODO - Later, acceptNewTask is to be set by the status of groom server.
-    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
-        justStarted, justInited, acceptNewTasks, heartbeatResponseId, status
-            .getTaskReports().size());
-
-    synchronized (this) {
-      for (TaskStatus taskStatus : status.getTaskReports()) {
-        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
-          LOG.debug("Removing task from runningTasks: "
-              + taskStatus.getTaskId());
-          runningTasks.remove(taskStatus.getTaskId());
-        }
-      }
-    }
-
-    // Force a rebuild of 'status' on the next iteration
-    status = null;
-
-    return heartbeatResponse;
-  }
-
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for (TaskInProgress tip : runningTasks.values()) {
@@ -469,7 +443,6 @@ public class GroomServer implements Runn
         } finally {
           // close();
         }
-
         if (shuttingDown) {
           return;
         }
@@ -490,11 +463,11 @@ public class GroomServer implements Runn
 
   public synchronized void close() throws IOException {
     this.running = false;
+    this.initialized = false;
     bspPeer.close();
     cleanupStorage();
-
-    // shutdown RPC connections
-    RPC.stopProxy(jobClient);
+    this.workerServer.stop();
+    RPC.stopProxy(masterClient);
   }
 
   public static Thread startGroomServer(final GroomServer hrs) {
@@ -523,7 +496,7 @@ public class GroomServer implements Runn
 
     public TaskInProgress(Task task, String groomServer) {
       this.task = task;
-      this.taskStatus = new TaskStatus(task.getTaskID(), 0,
+      this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
           TaskStatus.State.UNASSIGNED, "running", groomServer,
           TaskStatus.Phase.STARTING);
     }
@@ -539,7 +512,7 @@ public class GroomServer implements Runn
       this.runner = task.createRunner(bspPeer, this.jobConf);
       this.runner.start();
 
-      // Check state of Task
+      // Check state of a Task
       while (true) {
         try {
           Thread.sleep(1000);
@@ -550,7 +523,7 @@ public class GroomServer implements Runn
         if (bspPeer.getLocalQueueSize() == 0
             && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive()) {
           taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          acceptNewTasks = true;
+          doReport();
           break;
         }
       }
@@ -558,11 +531,42 @@ public class GroomServer implements Runn
     }
 
     /**
+     * Update and report refresh status back to BSPMaster.
+     */
+    private void doReport() {
+      GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
+          .getPeerName(), updateTaskStatus(), failures, maxCurrentTasks,
+          rpcServer);
+      try {
+        boolean ret = masterClient.report(new Directive(gss));
+        if (!ret) {
+          LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+              + " groom name: " + gss.getGroomName() + " peer name:"
+              + gss.getPeerName() + " rpc server:" + rpcServer);
+        }
+      } catch (IOException ioe) {
+        LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
+      }
+    }
+
+    private List<TaskStatus> updateTaskStatus() {
+      List<TaskStatus> tlist = new ArrayList<TaskStatus>();
+      for (TaskInProgress tip : runningTasks.values()) {
+        TaskStatus stus = tip.getStatus();
+        stus.setProgress(1f);
+        stus.setRunState(TaskStatus.State.SUCCEEDED);
+        stus.setPhase(TaskStatus.Phase.CLEANUP);
+        tlist.add((TaskStatus) stus.clone());
+      }
+      return tlist;
+    }
+
+    /**
      * This task has run on too long, and should be killed.
      */
     public synchronized void killAndCleanup(boolean wasFailure)
         throws IOException {
-      // TODO 
+      // TODO
       runner.kill();
     }
 
@@ -616,4 +620,26 @@ public class GroomServer implements Runn
           + groomServerClass.toString(), e);
     }
   }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (protocol.equals(WorkerProtocol.class.getName())) {
+      return WorkerProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to GroomServer: " + protocol);
+    }
+  }
+
+  /**
+   * GroomServer address information.
+   * 
+   * @return bsp peer information in the form of "address:port".
+   */
+  public String getBspPeerName() {
+    if (null != this.bspPeer)
+      return this.bspPeer.getPeerName();
+    return null;
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java Wed Jan 12 12:32:39 2011
@@ -17,11 +17,14 @@
  */
 package org.apache.hama.bsp;
 
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hama.ipc.WorkerProtocol;
 
 /**
- * Manages information about the {@link GroomServer}s running on a cluster.
- * This interface exits primarily to test the {@link BSPMaster}, and is not
- * intended to be implemented by users.
+ * Manages information about the {@link GroomServer}s in the cluster 
+ * environment. This interface is not intended to be implemented by users.
  */
 interface GroomServerManager {
 
@@ -30,6 +33,47 @@ interface GroomServerManager {
    * @param detailed if true then report groom names as well
    * @return summary of the state of the cluster
    */
-  public ClusterStatus getClusterStatus(boolean detailed);
-  
+  ClusterStatus getClusterStatus(boolean detailed);
+
+  /**
+   * Find WorkerProtocol with corresponded groom server status
+   * 
+   * @param groomId The identification value of GroomServer 
+   * @return GroomServerStatus 
+   */
+  WorkerProtocol findGroomServer(GroomServerStatus status);
+
+  /**
+   * Find the collection of groom servers.
+   * 
+   * @return Collection of groom servers list.
+   */
+  Collection<WorkerProtocol> findGroomServers();
+
+  /**
+   * Collection of GroomServerStatus as the key set.
+   *
+   * @return Collection of GroomServerStatus.
+   */
+  Collection<GroomServerStatus> groomServerStatusKeySet();
+
+  /**
+   * Registers a JobInProgressListener to GroomServerManager. Therefore,
+   * adding a JobInProgress will trigger the jobAdded function.
+   * @param the JobInProgressListener listener to be added.
+   */
+  void addJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Unregisters a JobInProgressListener to GroomServerManager. Therefore,
+   * the remove of a JobInProgress will trigger the jobRemoved action.
+   * @param the JobInProgressListener to be removed.
+   */
+  void removeJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Current GroomServer Peers.
+   * @return GroomName and PeerName(host:port) in pair. 
+   */
+  Map<String, String> currentGroomServerPeers();
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Wed Jan 12 12:32:39 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ public class GroomServerStatus implement
   
   String groomName;
   String peerName;
+  String rpcServer;
   int failures;
   List<TaskStatus> taskReports;
   
@@ -54,18 +56,25 @@ public class GroomServerStatus implement
   private int maxTasks;
 
   public GroomServerStatus() {
-    taskReports = new ArrayList<TaskStatus>();
+    //taskReports = new ArrayList<TaskStatus>();
+    taskReports = new CopyOnWriteArrayList<TaskStatus>();
   }
   
   public GroomServerStatus(String groomName, String peerName,
       List<TaskStatus> taskReports, int failures, int maxTasks) {
+    this(groomName, peerName, taskReports, failures, maxTasks, "");
+  }
+
+  public GroomServerStatus(String groomName, String peerName,
+      List<TaskStatus> taskReports, int failures, int maxTasks, String rpc) {
     this.groomName = groomName;
     this.peerName = peerName;
     this.taskReports = new ArrayList<TaskStatus>(taskReports);
     this.failures = failures;
     this.maxTasks = maxTasks;
-  }
-  
+    this.rpcServer = rpc;
+  } 
+
   public String getGroomName() {
     return groomName;
   }
@@ -73,11 +82,15 @@ public class GroomServerStatus implement
   /**
    * The host (and port) from where the groom server can be reached.
    *
-   * @return The groom server address in the format hostname:port
+   * @return The groom server address in the form of "hostname:port"
    */
   public String getPeerName() {
     return peerName;
   }
+
+  public String getRpcServer(){
+    return rpcServer;
+  }
   
   /**
    * Get the current tasks at the GroomServer.
@@ -116,12 +129,57 @@ public class GroomServerStatus implement
       TaskStatus.State state = ts.getRunState();
       if(state == TaskStatus.State.RUNNING || 
            state == TaskStatus.State.UNASSIGNED) {
-             taskCount++;
+        taskCount++;
       }    
     }    
     
     return taskCount;    
   }
+
+  /**
+   * For BSPMaster to distinguish between 
+   * different GroomServers, because 
+   * BSPMaster stores using GroomServerStatus
+   * as key.
+   */ 
+  @Override
+  public int hashCode(){
+    int result = 17;
+    result = 37*result + groomName.hashCode();
+    result = 37*result + peerName.hashCode();
+    result = 37*result + rpcServer.hashCode();
+    /*
+    result = 37*result + (int)failures;
+    result = 37*result + taskReports.hashCode();
+    result = 37*result + (int)(lastSeen^(lastSeen>>>32));  
+    result = 37*result + (int)maxTasks; 
+    */
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o){
+    if (o == this) return true;
+    if (null == o) return false;
+    if (getClass() != o.getClass()) return false;  
+
+    GroomServerStatus s = (GroomServerStatus) o;
+    if(!s.groomName.equals(groomName)) return false;
+    if(!s.peerName.equals(peerName)) return false;
+    if(!s.rpcServer.equals(rpcServer)) return false;
+    /*
+    if(s.failures != failures) return false;
+    if(null == s.taskReports){ 
+      if(null != s.taskReports)
+        return false;
+    }else if(!s.taskReports.equals(taskReports)){
+      return false;
+    }
+    if(s.lastSeen != lastSeen) return false;
+    if(s.maxTasks != maxTasks) return false;
+    */
+    return true;
+  }
   
   /* (non-Javadoc)
    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
@@ -130,6 +188,7 @@ public class GroomServerStatus implement
   public void readFields(DataInput in) throws IOException {
     this.groomName = Text.readString(in);
     this.peerName = Text.readString(in);
+    this.rpcServer = Text.readString(in);
     this.failures = in.readInt();
     this.maxTasks = in.readInt();
     taskReports.clear();
@@ -150,6 +209,7 @@ public class GroomServerStatus implement
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, groomName);
     Text.writeString(out, peerName);
+    Text.writeString(out, rpcServer);
     out.writeInt(failures);
     out.writeInt(maxTasks);
     out.writeInt(taskReports.size());
@@ -161,5 +221,4 @@ public class GroomServerStatus implement
   public Iterator<TaskStatus> taskReports() {
     return taskReports.iterator();
   }
-
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Wed Jan 12 12:32:39 2011
@@ -45,6 +45,7 @@ class JobInProgress {
 
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
   boolean tasksInited = false;
+  boolean jobInited = false;
 
   Configuration conf;
   JobProfile profile;
@@ -76,7 +77,7 @@ class JobInProgress {
     this.localFs = FileSystem.getLocal(conf);
     this.jobFile = jobFile;
     this.master = master;
-    this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.PREP);
+    this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.State.PREP.value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -139,6 +140,21 @@ class JobInProgress {
     return jobId;
   }
 
+  public synchronized TaskInProgress findTaskInProgress(TaskID id){
+    if(areTasksInited()){
+      for(TaskInProgress tip: tasks){
+        if(tip.getTaskId().equals(id)){
+          return tip;
+        }
+      }
+    }
+    return null;
+  }
+
+  public synchronized boolean areTasksInited(){
+    return this.tasksInited;
+  }
+
   public String toString() {
     return "jobName:" + profile.getJobName() + "\n" + "submit user:"
         + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
@@ -154,7 +170,9 @@ class JobInProgress {
       return;
     }
 
-    LOG.debug("numBSPTasks: " + numBSPTasks);
+    if(LOG.isDebugEnabled()){
+      LOG.debug("numBSPTasks: " + numBSPTasks);
+    }
     
     // adjust number of map tasks to actual number of splits
     this.tasks = new TaskInProgress[numBSPTasks];
@@ -192,7 +210,6 @@ class JobInProgress {
     } catch (IOException e) {
       e.printStackTrace();
     }
-
     return result;
   }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+  /**
+   * Invoked when a new job has been added to the {@link BSPMaster}.
+   * @param job The job to be added.
+   * @throws IOException 
+   */
+  public abstract void jobAdded(JobInProgress job) throws IOException;
+
+  /**
+   * Invoked when a job has been removed from the {@link BSPMaster}.
+   * @param job The job to be removed .
+   * @throws IOException
+   */
+  public abstract void jobRemoved(JobInProgress job) throws IOException;
+  
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Wed Jan 12 12:32:39 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.io.WritableFactory;
 
 public class JobStatus implements Writable, Cloneable {
+
   static {
     WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
       public Writable newInstance() {
@@ -35,6 +36,21 @@ public class JobStatus implements Writab
     });
   }
 
+  public static enum State{
+    RUNNING(1),
+    SUCCEEDED(2),
+    FAILED(3),
+    PREP(4),
+    KILLED(5);
+    int s;
+    State(int s){
+      this.s = s;
+    }
+    public int value(){
+      return this.s;
+    }
+  }
+
   public static final int RUNNING = 1;
   public static final int SUCCEEDED = 2;
   public static final int FAILED = 3;
@@ -45,6 +61,7 @@ public class JobStatus implements Writab
   private float progress;
   private float cleanupProgress;
   private float setupProgress;
+  private volatile State state;// runState in enum
   private int runState;
   private long startTime;
   private String schedulingInfo = "NA";
@@ -77,6 +94,7 @@ public class JobStatus implements Writab
     this.progress = progress;
     this.cleanupProgress = cleanupProgress;
     this.runState = runState;
+    this.state = State.values()[runState-1];
     this.superstepCount = superstepCount;
     this.user = user;
   }
@@ -109,6 +127,14 @@ public class JobStatus implements Writab
     this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
   }
 
+  public JobStatus.State getState(){
+    return this.state;
+  }
+
+  public void setState(JobStatus.State state){
+    this.state = state;
+  }
+
   public synchronized int getRunState() {
     return runState;
   }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+
+public interface Queue<T>{
+
+  /**
+   * The queue name.
+   * @return the name of current queue.
+   */ 
+  String getName();
+
+  /**
+   * Add a job to a queue.
+   * @param job to be added to the queue.
+   */
+  void addJob(T job);
+
+  /**
+   * Remove a job from the queue.
+   * @param job to be removed from the queue.
+   */
+  void removeJob(T job);
+
+  /**
+   * Get a job
+   * @return job that is removed from the queue.
+   */
+  T removeJob();
+
+  /**
+   * Return all data stored in this queue.
+   * @return Collection of jobs.
+   */
+  public Collection<T> jobs();
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class QueueManager{
+
+  private ConcurrentMap<String, Queue<JobInProgress>> queues = 
+    new ConcurrentHashMap<String, Queue<JobInProgress>>();
+
+  public QueueManager(Configuration conf){ }
+
+  /**
+   * Initialize a job.
+   * @param job required initialzied.
+   */
+  public void initJob(JobInProgress job){
+    try{
+      //job.updateStatus();
+      job.initTasks();
+    }catch(IOException ioe){
+      ioe.printStackTrace();
+    }
+  }
+
+  /**
+   * Add a job to the specified queue.
+   * @param name of the queue.
+   * @param job to be added.
+   */
+  public void addJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.addJob(job);
+  }
+
+  /**
+   * Remove a job from the head of a designated queue.
+   * @param name from which a job is removed.
+   * @param job to be removed from the queue.
+   */
+  public void removeJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.removeJob(job);
+  }
+
+  /**
+   * Move a job from a queue to another. 
+   * @param from a queue a job is to be removed.
+   * @param to a queue a job is to be added.
+   */
+  public void moveJob(String from, String to, JobInProgress job){
+    synchronized(queues){
+      removeJob(from, job);
+      addJob(to, job);
+    }  
+  }
+
+  /**
+   * Create a FCFS queue with the name provided.
+   * @param name of the queue. 
+   */
+  public void createFCFSQueue(String name){
+    queues.putIfAbsent(name, new FCFSQueue(name));
+  }
+
+  /**
+   * Find Queue according to the name specified.
+   * @param name of the queue. 
+   * @return queue of JobInProgress 
+   */
+  public Queue<JobInProgress> findQueue(String name){
+     return queues.get(name);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * This is the class that schedules commands to GroomServer(s)  
+ */
+public interface Schedulable{
+
+  /**
+   * Schedule job to designated GroomServer(s) immediately.
+   * @param job to be scheduled. 
+   * @param statuses of GroomServer(s).
+   * @throws IOException
+   */
+  void schedule(JobInProgress job, GroomServerStatus... statuses) 
+      throws IOException;
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Wed Jan 12 12:32:39 2011
@@ -17,83 +17,163 @@
  */
 package org.apache.hama.bsp;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.WorkerProtocol;
 
 class SimpleTaskScheduler extends TaskScheduler {
+
   private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
-  List<JobInProgress> jobQueue;
 
-  public SimpleTaskScheduler() {
-    jobQueue = new ArrayList<JobInProgress>();
+  public static final String WAIT_QUEUE = "waitQueue";
+  public static final String PROCESSING_QUEUE = "processingQueue";
+  public static final String FINISHED_QUEUE = "finishedQueue";
+
+  private QueueManager queueManager;
+  private volatile boolean initialized;
+  private JobListener jobListener;
+  private JobProcessor jobProcessor;
+
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+      queueManager.initJob(job); // init task
+      queueManager.addJob(WAIT_QUEUE, job);
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) throws IOException {
+      // queueManager.removeJob(WAIT_QUEUE, job);
+      queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+    }
   }
 
-  @Override
-  public void addJob(JobInProgress job) {
-    LOG.debug("Added a job (" + job + ") to scheduler (remaining jobs: "
-        + (jobQueue.size() + 1) + ")");
-    jobQueue.add(job);
+  private class JobProcessor extends Thread implements Schedulable {
+    JobProcessor() {
+      super("JobProcess");
+    }
+
+    /**
+     * Main logic scheduling task to GroomServer(s). Also, it will move
+     * JobInProgress from Wait Queue to Processing Queue.
+     */
+    public void run() {
+      if (false == initialized) {
+        throw new IllegalStateException("SimpleTaskScheduler initialization"
+            + " is not yet finished!");
+      }
+      while (initialized) {
+        Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+        if (null == queue) {
+          LOG.error(WAIT_QUEUE + " does not exist.");
+          throw new NullPointerException(WAIT_QUEUE + " does not exist.");
+        }
+        // move a job from the wait queue to the processing queue
+        JobInProgress j = queue.removeJob();
+        queueManager.addJob(PROCESSING_QUEUE, j);
+        // schedule
+        Collection<GroomServerStatus> glist = groomServerManager
+            .groomServerStatusKeySet();
+        schedule(j, (GroomServerStatus[]) glist
+            .toArray(new GroomServerStatus[glist.size()]));
+      }
+    }
+
+    /**
+     * Schedule job to designated GroomServer(s) immediately.
+     * 
+     * @param Targeted GroomServer(s).
+     * @param Job to be scheduled.
+     */
+    @Override
+    public void schedule(JobInProgress job, GroomServerStatus... statuses) {
+      ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+      final int numGroomServers = clusterStatus.getGroomServers();
+      final ScheduledExecutorService sched = Executors
+          .newScheduledThreadPool(statuses.length + 5);
+      for (GroomServerStatus status : statuses) {
+        sched
+            .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
+      }// for
+    }
   }
 
-  // removes job
-  public void removeJob(JobInProgress job) {
-    jobQueue.remove(job);
+  private class TaskWorker implements Runnable {
+    private final GroomServerStatus stus;
+    private final int groomNum;
+    private final JobInProgress jip;
+
+    TaskWorker(final GroomServerStatus stus, final int num,
+        final JobInProgress jip) {
+      this.stus = stus;
+      this.groomNum = num;
+      this.jip = jip;
+      if (null == this.stus)
+        throw new NullPointerException("Target groom server is not "
+            + "specified.");
+      if (-1 == this.groomNum)
+        throw new IllegalArgumentException("Groom number is not specified.");
+      if (null == this.jip)
+        throw new NullPointerException("No job is specified.");
+    }
+
+    public void run() {
+      // obtain tasks
+      Task t = jip.obtainNewTask(this.stus, groomNum);
+      // assembly into actions
+      // List<Task> tasks = new ArrayList<Task>();
+      if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+        WorkerProtocol worker = groomServerManager.findGroomServer(this.stus);
+        try {
+          // dispatch() to the groom server
+          Directive d1 = new Directive(groomServerManager
+              .currentGroomServerPeers(),
+              new GroomServerAction[] { new LaunchTaskAction(t) });
+          worker.dispatch(d1);
+        } catch (IOException ioe) {
+          LOG.error("Fail to dispatch tasks to GroomServer "
+              + this.stus.getGroomName(), ioe);
+        }
+      } else {
+        LOG.warn("Currently master only shcedules job in running state. "
+            + "This may be refined in the future. JobId:" + jip.getJobID());
+      }
+    }
   }
 
-  @Override
-  public Collection<JobInProgress> getJobs() {
-    return jobQueue;
+  public SimpleTaskScheduler() {
+    this.jobListener = new JobListener();
+    this.jobProcessor = new JobProcessor();
   }
 
-  /*
-   * (non-Javadoc)
-   * @seeorg.apache.hama.bsp.TaskScheduler#assignTasks(org.apache.hama.bsp.
-   * GroomServerStatus)
-   */
   @Override
-  public synchronized List<Task> assignTasks(GroomServerStatus groomStatus)
-      throws IOException {
-    ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
-    final int numGroomServers = clusterStatus.getGroomServers();
-    // final int clusterTaskCapacity = clusterStatus.getMaxTasks();
-
-    // Get task counts for the current groom.
-    // final int groomTaskCapacity = groom.getMaxTasks();
-    final int groomRunningTasks = groomStatus.countTasks();
-
-    // Assigned tasks
-    List<Task> assignedTasks = new ArrayList<Task>();
-
-    if (groomRunningTasks == 0) {
-      // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress
-      // instance to the scheduler.
-      synchronized (jobQueue) {
-        for (JobInProgress job : jobQueue) {
-          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-            continue;
-          }
-
-          Task t = null;
-          t = job.obtainNewTask(groomStatus, numGroomServers);
-
-          if (t != null) {
-            assignedTasks.add(t);
-            break; // TODO - Now, simple scheduler assigns only one task to
-            // each groom. Later, it will be improved for scheduler to
-            // assign one or more tasks to each groom according to
-            // its capacity.
-          }
-        }
+  public void start() {
+    this.queueManager = new QueueManager(getConf()); // TODO: need factory?
+    this.queueManager.createFCFSQueue(WAIT_QUEUE);
+    this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
+    this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+    groomServerManager.addJobInProgressListener(this.jobListener);
+    this.initialized = true;
+    this.jobProcessor.start();
+  }
 
-      }
-    }
+  @Override
+  public void terminate() {
+    this.initialized = false;
+    if (null != this.jobListener)
+      groomServerManager.removeJobInProgressListener(this.jobListener);
+  }
 
-    return assignedTasks;
+  @Override
+  public Collection<JobInProgress> getJobs(String queue) {
+    return (queueManager.findQueue(queue)).jobs();
+    // return jobQueue;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Jan 12 12:32:39 2011
@@ -62,6 +62,10 @@ public abstract class Task implements Wr
   public String getJobFile() { 
     return jobFile; 
   }
+
+  public TaskAttemptID getTaskAttemptId(){
+    return this.taskId;
+  }
   
   public TaskAttemptID getTaskID() {
     return taskId;