You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC

svn commit: r1152788 [4/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/ core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/hama/ core/src/main/java/o...

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * JobInProgress maintains all the info for keeping a Job on the straight and
+ * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
+ * tables for doing bookkeeping of its Tasks.ss
+ */
+class JobInProgress {
+
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+    private static final long serialVersionUID = 1L;
+
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+
+  static final Log LOG = LogFactory.getLog(JobInProgress.class);
+  boolean tasksInited = false;
+  boolean jobInited = false;
+
+  Configuration conf;
+  JobProfile profile;
+  JobStatus status;
+  Path jobFile = null;
+  Path localJobFile = null;
+  Path localJarFile = null;
+  private LocalFileSystem localFs;
+  // Indicates how many times the job got restarted
+  private int restartCount;
+
+  long startTime;
+  long launchTime;
+  long finishTime;
+
+  private String jobName;
+
+  // private LocalFileSystem localFs;
+  private BSPJobID jobId;
+  final BSPMaster master;
+  TaskInProgress tasks[] = new TaskInProgress[0];
+  private long superstepCounter;
+
+  int numBSPTasks = 0;
+  int clusterSize;
+
+  public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
+      Configuration conf) throws IOException {
+    this.conf = conf;
+    this.jobId = jobId;
+    this.localFs = FileSystem.getLocal(conf);
+    this.jobFile = jobFile;
+    this.master = master;
+    this.status = new JobStatus(jobId, null, 0L, 0L,
+        JobStatus.State.PREP.value());
+    this.startTime = System.currentTimeMillis();
+    this.superstepCounter = 0;
+    this.restartCount = 0;
+
+    this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+        + ".xml");
+    this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+        + ".jar");
+
+    Path jobDir = master.getSystemDirectoryForJob(jobId);
+    FileSystem fs = jobDir.getFileSystem(conf);
+    fs.copyToLocalFile(jobFile, localJobFile);
+    BSPJob job = new BSPJob(jobId, localJobFile.toString());
+    this.numBSPTasks = job.getNumBspTask();
+
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+        job.getJobName());
+
+    this.setJobName(job.getJobName());
+
+    status.setUsername(job.getUser());
+    status.setStartTime(startTime);
+
+    String jarFile = job.getJar();
+    if (jarFile != null) {
+      fs.copyToLocalFile(new Path(jarFile), localJarFile);
+    }
+
+  }
+
+  public JobProfile getProfile() {
+    return profile;
+  }
+
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  public synchronized long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  /**
+   * @return the number of desired tasks.
+   */
+  public int desiredBSPTasks() {
+    return numBSPTasks;
+  }
+
+  /**
+   * @return The JobID of this JobInProgress.
+   */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  public synchronized TaskInProgress findTaskInProgress(TaskID id) {
+    if (areTasksInited()) {
+      for (TaskInProgress tip : tasks) {
+        if (tip.getTaskId().equals(id)) {
+          return tip;
+        }
+      }
+    }
+    return null;
+  }
+
+  public synchronized boolean areTasksInited() {
+    return this.tasksInited;
+  }
+
+  public String toString() {
+    return "jobName:" + profile.getJobName() + "\n" + "submit user:"
+        + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
+        + jobFile + "\n";
+  }
+
+  // ///////////////////////////////////////////////////
+  // Create/manage tasks
+  // ///////////////////////////////////////////////////
+
+  public synchronized void initTasks() throws IOException {
+    if (tasksInited) {
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("numBSPTasks: " + numBSPTasks);
+    }
+
+    // adjust number of map tasks to actual number of splits
+    this.tasks = new TaskInProgress[numBSPTasks];
+    for (int i = 0; i < numBSPTasks; i++) {
+      tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+          this.master, this.conf, this, i);
+    }
+
+    // Update job status
+    this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
+        0L, 0L, JobStatus.RUNNING);
+
+    tasksInited = true;
+    LOG.debug("Job is initialized.");
+  }
+
+  public synchronized Task obtainNewTask(GroomServerStatus status,
+      int clusterSize) {
+    this.clusterSize = clusterSize;
+
+    if (this.status.getRunState() != JobStatus.RUNNING) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+
+    Task result = null;
+    try {
+      for (int i = 0; i < tasks.length; i++) {
+        if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+          result = tasks[i].getTaskToRun(status);
+          break;
+        }
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception while obtaining new task!", e);
+    }
+    return result;
+  }
+
+  public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
+    TaskAttemptID taskid = status.getTaskId();
+    updateTaskStatus(tip, status);
+    LOG.info("Taskid '" + taskid + "' has finished successfully.");
+    tip.completed(taskid);
+
+    //
+    // If all tasks are complete, then the job is done!
+    //
+
+    boolean allDone = true;
+    for (TaskInProgress taskInProgress : tasks) {
+      if (!taskInProgress.isComplete()) {
+        allDone = false;
+        break;
+      }
+    }
+
+    if (allDone) {
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+      this.finishTime = System.currentTimeMillis();
+      this.status.setFinishTime(this.finishTime);
+
+      LOG.debug("Job successfully done.");
+
+      garbageCollect();
+    }
+  }
+
+  public void failedTask(TaskInProgress tip, TaskStatus status) {
+    TaskAttemptID taskid = status.getTaskId();
+    updateTaskStatus(tip, status);
+    LOG.info("Taskid '" + taskid + "' has failed.");
+    tip.terminated(taskid);
+
+    //
+    // If all tasks are complete, then the job is done!
+    //
+
+    boolean allDone = true;
+    for (TaskInProgress taskInProgress : tasks) {
+      if (!taskInProgress.isFailed()) {
+        allDone = false;
+        break;
+      }
+    }
+
+    if (allDone) {
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.FAILED, superstepCounter);
+      this.finishTime = System.currentTimeMillis();
+      this.status.setFinishTime(this.finishTime);
+
+      LOG.debug("Job failed.");
+
+      garbageCollect();
+    }
+  }
+
+  public synchronized void updateTaskStatus(TaskInProgress tip,
+      TaskStatus taskStatus) {
+    tip.updateStatus(taskStatus); // update tip
+
+    if (superstepCounter < taskStatus.getSuperstepCount()) {
+      superstepCounter = taskStatus.getSuperstepCount();
+      // TODO Later, we have to update JobInProgress status here
+
+    }
+  }
+
+  public synchronized void kill() {
+    if (status.getRunState() != JobStatus.KILLED) {
+      this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
+          0L, 0L, 0L, JobStatus.KILLED);
+      this.finishTime = System.currentTimeMillis();
+      this.status.setFinishTime(this.finishTime);
+      //
+      // kill all TIPs.
+      //
+      for (int i = 0; i < tasks.length; i++) {
+        tasks[i].kill();
+      }
+
+      garbageCollect();
+    }
+
+  }
+
+  /**
+   * The job is dead. We're now GC'ing it, getting rid of the job from all
+   * tables. Be sure to remove all of this job's tasks from the various tables.
+   */
+  synchronized void garbageCollect() {
+    try {
+      // Definitely remove the local-disk copy of the job file
+      if (localJobFile != null) {
+        localFs.delete(localJobFile, true);
+        localJobFile = null;
+      }
+      if (localJarFile != null) {
+        localFs.delete(localJarFile, true);
+        localJarFile = null;
+      }
+
+      // JobClient always creates a new directory with job files
+      // so we remove that directory to cleanup
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(new Path(profile.getJobFile()).getParent(), true);
+
+    } catch (IOException e) {
+      LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
+    }
+  }
+
+  /**
+   * Get the number of times the job has restarted
+   */
+  int getNumRestarts() {
+    return restartCount;
+  }
+
+  /**
+   * @param jobName the jobName to set
+   */
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  /**
+   * @return the jobName
+   */
+  public String getJobName() {
+    return jobName;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+  /**
+   * Invoked when a new job has been added to the {@link BSPMaster}.
+   * @param job The job to be added.
+   * @throws IOException 
+   */
+  public abstract void jobAdded(JobInProgress job) throws IOException;
+
+  /**
+   * Invoked when a job has been removed from the {@link BSPMaster}.
+   * @param job The job to be removed .
+   * @throws IOException
+   */
+  public abstract void jobRemoved(JobInProgress job) throws IOException;
+  
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * A JobProfile tracks job's status.
+ */
+public class JobProfile implements Writable {
+
+  static { // register actor
+    WritableFactories.setFactory(JobProfile.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new JobProfile();
+      }
+    });
+  }
+
+  String user;
+  final BSPJobID jobid;
+  String jobFile;
+  String name;
+
+  /**
+   * Construct an empty {@link JobProfile}.
+   */
+  public JobProfile() {
+    jobid = new BSPJobID();
+  }
+
+  /**
+   * Construct a {@link JobProfile} the userid, jobid, job config-file,
+   * job-details url and job name.
+   * 
+   * @param user userid of the person who submitted the job.
+   * @param jobid id of the job.
+   * @param jobFile job configuration file.
+   * @param name user-specified job name.
+   */
+  public JobProfile(String user, BSPJobID jobid, String jobFile, String name) {
+    this.user = user;
+    this.jobid = jobid;
+    this.jobFile = jobFile;
+    this.name = name;
+  }
+
+  /**
+   * Get the user id.
+   */
+  public String getUser() {
+    return user;
+  }
+
+  /**
+   * Get the job id.
+   */
+  public BSPJobID getJobID() {
+    return jobid;
+  }
+
+  /**
+   * Get the configuration file for the job.
+   */
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  /**
+   * Get the user-specified job name.
+   */
+  public String getJobName() {
+    return name;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    Text.writeString(out, jobFile);
+    Text.writeString(out, user);
+    Text.writeString(out, name);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+    this.jobFile = Text.readString(in);
+    this.user = Text.readString(in);
+    this.name = Text.readString(in);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Describes the current status of a job.
+ */
+public class JobStatus implements Writable, Cloneable {
+  public static final Log LOG = LogFactory.getLog(JobStatus.class);
+
+  static {
+    WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new JobStatus();
+      }
+    });
+  }
+
+  public static enum State {
+    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+    int s;
+
+    State(int s) {
+      this.s = s;
+    }
+
+    public int value() {
+      return this.s;
+    }
+
+    @Override
+    public String toString() {
+      String name = null;
+      switch (this) {
+        case RUNNING:
+          name = "RUNNING";
+          break;
+        case SUCCEEDED:
+          name = "SUCCEEDED";
+          break;
+        case FAILED:
+          name = "FAILED";
+          break;
+        case PREP:
+          name = "SETUP";
+          break;
+        case KILLED:
+          name = "KILLED";
+          break;
+      }
+
+      return name;
+    }
+
+  }
+
+  public static final int RUNNING = 1;
+  public static final int SUCCEEDED = 2;
+  public static final int FAILED = 3;
+  public static final int PREP = 4;
+  public static final int KILLED = 5;
+
+  private BSPJobID jobid;
+  private long progress;
+  private long cleanupProgress;
+  private long setupProgress;
+  private volatile State state;// runState in enum
+  private int runState;
+  private long startTime;
+  private String schedulingInfo = "NA";
+  private String user;
+  private long superstepCount;
+  private String name;
+
+  private long finishTime;
+
+  public JobStatus() {
+  }
+
+  public JobStatus(BSPJobID jobid, String user, long progress, int runState) {
+    this(jobid, user, progress, 0, runState);
+  }
+
+  public JobStatus(BSPJobID jobid, String user, long progress,
+      long cleanupProgress, int runState) {
+    this(jobid, user, 0, progress, cleanupProgress, runState);
+  }
+
+  public JobStatus(BSPJobID jobid, String user, long setupProgress,
+      long progress, long cleanupProgress, int runState) {
+    this(jobid, user, 0, progress, cleanupProgress, runState, 0);
+  }
+
+  public JobStatus(BSPJobID jobid, String user, long setupProgress,
+      long progress, long cleanupProgress, int runState, long superstepCount) {
+    this.jobid = jobid;
+    this.setupProgress = setupProgress;
+    this.progress = progress;
+    this.cleanupProgress = cleanupProgress;
+    this.runState = runState;
+    this.state = State.values()[runState - 1];
+    this.superstepCount = superstepCount;
+    this.user = user;
+  }
+
+  public BSPJobID getJobID() {
+    return jobid;
+  }
+
+  public synchronized long progress() {
+    return progress;
+  }
+
+  synchronized void setprogress(long p) {
+    this.progress = p;
+  }
+
+  public synchronized long cleanupProgress() {
+    return cleanupProgress;
+  }
+
+  synchronized void setCleanupProgress(int p) {
+    this.cleanupProgress = p;
+  }
+
+  public synchronized long setupProgress() {
+    return setupProgress;
+  }
+
+  synchronized void setSetupProgress(long p) {
+    this.setupProgress = p;
+  }
+
+  public JobStatus.State getState() {
+    return this.state;
+  }
+
+  public void setState(JobStatus.State state) {
+    this.state = state;
+  }
+
+  public synchronized int getRunState() {
+    return runState;
+  }
+
+  public synchronized void setRunState(int state) {
+    this.runState = state;
+  }
+
+  public synchronized long getSuperstepCount() {
+    return superstepCount;
+  }
+
+  public synchronized void setSuperstepCount(long superstepCount) {
+    this.superstepCount = superstepCount;
+  }
+
+  public synchronized void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public synchronized long getStartTime() {
+    return startTime;
+  }
+
+  public synchronized void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Get the finish time of the job.
+   */
+  public synchronized long getFinishTime() {
+    return finishTime;
+  }
+
+  /**
+   * @param user The username of the job
+   */
+  public synchronized void setUsername(String user) {
+    this.user = user;
+  }
+
+  /**
+   * @return the username of the job
+   */
+  public synchronized String getUsername() {
+    return user;
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  public synchronized String getSchedulingInfo() {
+    return schedulingInfo;
+  }
+
+  public synchronized void setSchedulingInfo(String schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+
+  public synchronized boolean isJobComplete() {
+    return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
+  }
+
+  public synchronized void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    out.writeLong(setupProgress);
+    out.writeLong(progress);
+    out.writeLong(cleanupProgress);
+    out.writeInt(runState);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    Text.writeString(out, user);
+    Text.writeString(out, schedulingInfo);
+    out.writeLong(superstepCount);
+  }
+
+  public synchronized void readFields(DataInput in) throws IOException {
+    this.jobid = new BSPJobID();
+    jobid.readFields(in);
+    this.setupProgress = in.readLong();
+    this.progress = in.readLong();
+    this.cleanupProgress = in.readLong();
+    this.runState = in.readInt();
+    this.startTime = in.readLong();
+    this.finishTime = in.readLong();
+    this.user = Text.readString(in);
+    this.schedulingInfo = Text.readString(in);
+    this.superstepCount = in.readLong();
+  }
+
+  /**
+   * @param name the name to set
+   */
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * @return the name
+   */
+  public synchronized String getName() {
+    return name;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
+ * resources.
+ */
+class KillJobAction extends GroomServerAction {
+  String jobId;
+
+  public KillJobAction() {
+    super(ActionType.KILL_JOB);
+    jobId = new String();
+  }
+
+  public KillJobAction(String killJobId) {
+    super(ActionType.KILL_JOB);
+    this.jobId = killJobId;
+  }
+
+  public String getJobID() {
+    return jobId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, jobId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobId = Text.readString(in);
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class KillTaskAction extends GroomServerAction {
+  TaskAttemptID taskId;
+  
+  public KillTaskAction() {
+    super(ActionType.KILL_TASK);
+    taskId = new TaskAttemptID();
+  }
+  
+  public KillTaskAction(TaskAttemptID killTaskId) {
+    super(ActionType.KILL_TASK);
+    this.taskId = killTaskId;
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+class LaunchTaskAction extends GroomServerAction {
+  private Task task;
+
+  public LaunchTaskAction() {
+    super(ActionType.LAUNCH_TASK);
+  }
+
+  public LaunchTaskAction(Task task) {
+    super(ActionType.LAUNCH_TASK);
+    this.task = task;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task = new BSPTask();
+    task.readFields(in);
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A multithreaded local BSP runner that can be used for debugging BSP's. It
+ * uses the working directory "/user/hama/bsp/" and starts runners based on the
+ * number of the machines core.
+ * 
+ */
+public class LocalBSPRunner implements JobSubmissionProtocol {
+  public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
+
+  private static final String IDENTIFIER = "localrunner";
+  private static String WORKING_DIR = "/user/hama/bsp/";
+  protected static volatile ThreadPoolExecutor threadPool;
+  protected static int threadPoolSize;
+  protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+  protected static CyclicBarrier barrier;
+
+  static {
+    threadPoolSize = Runtime.getRuntime().availableProcessors();
+    barrier = new CyclicBarrier(threadPoolSize);
+    threadPool = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(threadPoolSize);
+  }
+
+  protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
+  protected String jobFile;
+  protected String jobName;
+
+  protected JobStatus currentJobStatus;
+
+  protected Configuration conf;
+  protected FileSystem fs;
+
+  public LocalBSPRunner(Configuration conf) throws IOException {
+    super();
+    this.conf = conf;
+    this.fs = FileSystem.get(conf);
+    String path = conf.get("bsp.local.dir");
+    if (path != null && !path.isEmpty())
+      WORKING_DIR = path;
+
+    threadPoolSize = conf.getInt("bsp.local.tasks.maximum", 20);
+    threadPool = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(threadPoolSize);
+    barrier = new CyclicBarrier(threadPoolSize);
+
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      localGrooms.put(name, new LocalGroom(name));
+    }
+
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return 3;
+  }
+
+  @Override
+  public BSPJobID getNewJobId() throws IOException {
+    return new BSPJobID(IDENTIFIER, 1);
+  }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    this.jobFile = jobFile;
+    BSPJob job = new BSPJob(jobID, jobFile);
+    job.setNumBspTask(threadPoolSize);
+    this.jobName = job.getJobName();
+    currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
+        JobStatus.RUNNING);
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      LocalGroom localGroom = new LocalGroom(name);
+      localGrooms.put(name, localGroom);
+      futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
+          .newInstance(job.getBspClass(), conf), localGroom)));
+    }
+    new Thread(new ThreadObserver(currentJobStatus)).start();
+    return currentJobStatus;
+  }
+
+  @Override
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    Map<String, String> map = new HashMap<String, String>();
+    for (Entry<String, LocalGroom> entry : localGrooms.entrySet()) {
+      map.put(entry.getKey(), entry.getValue().getPeerName());
+    }
+    return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING);
+  }
+
+  @Override
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+    return new JobProfile(System.getProperty("user.name"), jobid, jobFile,
+        jobName);
+  }
+
+  @Override
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+    if (currentJobStatus == null) {
+      currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
+          0L, JobStatus.RUNNING);
+    }
+    return currentJobStatus;
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException {
+    return fs.getUri().toString();
+  }
+
+  @Override
+  public JobStatus[] jobsToComplete() throws IOException {
+    return null;
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getSystemDir() {
+    return WORKING_DIR;
+  }
+
+  @Override
+  public void killJob(BSPJobID jobid) throws IOException {
+    return;
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException {
+    return false;
+  }
+
+  // this class will spawn a new thread and executes the BSP
+  class BSPRunner implements Callable<BSP> {
+
+    Configuration conf;
+    BSPJob job;
+    BSP bsp;
+    LocalGroom groom;
+
+    public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) {
+      super();
+      this.conf = conf;
+      this.job = job;
+      this.bsp = bsp;
+      this.groom = groom;
+    }
+
+    public void run() {
+      bsp.setConf(conf);
+      try {
+         bsp.bsp(groom);
+      } catch (Exception e) {
+        LOG.error("Exception during BSP execution!", e);
+      }
+    }
+
+    @Override
+    public BSP call() throws Exception {
+      run();
+      return bsp;
+    }
+  }
+
+  // this thread observes the status of the runners.
+  class ThreadObserver implements Runnable {
+
+    JobStatus status;
+
+    public ThreadObserver(JobStatus currentJobStatus) {
+      this.status = currentJobStatus;
+    }
+
+    @Override
+    public void run() {
+      boolean success = true;
+      for (Future<BSP> future : futureList) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          LOG.error("Exception during BSP execution!", e);
+          success = false;
+        } catch (ExecutionException e) {
+          LOG.error("Exception during BSP execution!", e);
+          success = false;
+        }
+      }
+      if (success) {
+        currentJobStatus.setState(JobStatus.State.SUCCEEDED);
+        currentJobStatus.setRunState(JobStatus.SUCCEEDED);
+      } else {
+        currentJobStatus.setState(JobStatus.State.FAILED);
+        currentJobStatus.setRunState(JobStatus.FAILED);
+      }
+      threadPool.shutdownNow();
+    }
+
+  }
+
+  class LocalGroom extends BSPPeer {
+    private long superStepCount = 0;
+    private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
+    // outgoing queue
+    private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
+    private final String peerName;
+
+    public LocalGroom(String peerName) throws IOException {
+      this.peerName = peerName;
+    }
+
+    @Override
+    public void send(String peerName, BSPMessage msg) throws IOException {
+      if (this.peerName.equals(peerName)) {
+        put(msg);
+      } else {
+        // put this into a outgoing queue
+        if (outgoingQueues.get(peerName) == null) {
+          outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
+        }
+        outgoingQueues.get(peerName).add(msg);
+      }
+    }
+
+    @Override
+    public void put(BSPMessage msg) throws IOException {
+      localMessageQueue.add(msg);
+    }
+
+    @Override
+    public BSPMessage getCurrentMessage() throws IOException {
+      return localMessageQueue.poll();
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return localMessageQueue.size();
+    }
+
+    @Override
+    public void sync() throws IOException, KeeperException,
+        InterruptedException {
+      // wait until all threads reach this barrier
+      barrierSync();
+      // send the messages
+      for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
+          .entrySet()) {
+        String peerName = entry.getKey();
+        for (BSPMessage msg : entry.getValue())
+          localGrooms.get(peerName).put(msg);
+      }
+      // clear the local outgoing queue
+      outgoingQueues.clear();
+      // sync again to avoid data inconsistency
+      barrierSync();
+      incrementSuperSteps();
+    }
+
+    private void barrierSync() throws InterruptedException {
+      try {
+        barrier.await();
+      } catch (BrokenBarrierException e) {
+        throw new InterruptedException("Barrier has been broken!" + e);
+      }
+    }
+
+    private void incrementSuperSteps() {
+      currentJobStatus.setprogress(superStepCount++);
+      currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superStepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return peerName;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return localGrooms.keySet().toArray(
+          new String[localGrooms.keySet().size()]);
+    }
+
+    @Override
+    public void clear() {
+      localMessageQueue.clear();
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 3;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void put(BSPMessageBundle messages) throws IOException {
+    }
+
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A interface for BSP message class.
+ */
+public interface Messagable {
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * 
+ */
+public class PeerNames implements Writable {
+  Collection<String> allPeers;
+  
+  public PeerNames() {
+    this.allPeers = new ArrayList<String>();
+  }
+  
+  public PeerNames(Collection<String> allPeers) {
+    this.allPeers = allPeers;
+  }
+  
+  public Collection<String> getAllPeerNames() {
+    return allPeers;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(allPeers.size());
+    for (String peerName : allPeers) {
+      Text.writeString(out, peerName);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int peersNum = in.readInt();
+    for (int i = 0; i < peersNum; i++) {
+      allPeers.add(Text.readString(in));
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+
+/**
+ * Job Queue interface.
+ *  
+ * @param <T>
+ */
+public interface Queue<T>{
+
+  /**
+   * The queue name.
+   * @return the name of current queue.
+   */ 
+  String getName();
+
+  /**
+   * Add a job to a queue.
+   * @param job to be added to the queue.
+   */
+  void addJob(T job);
+
+  /**
+   * Remove a job from the queue.
+   * @param job to be removed from the queue.
+   */
+  void removeJob(T job);
+
+  /**
+   * Get a job
+   * @return job that is removed from the queue.
+   */
+  T removeJob();
+
+  /**
+   * Return all data stored in this queue.
+   * @return Collection of jobs.
+   */
+  public Collection<T> jobs();
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A BSPJob Queue Manager. 
+ */
+public class QueueManager{
+
+  private ConcurrentMap<String, Queue<JobInProgress>> queues = 
+    new ConcurrentHashMap<String, Queue<JobInProgress>>();
+
+  public QueueManager(Configuration conf){ }
+
+  /**
+   * Initialize a job.
+   * @param job required initialzied.
+   */
+  public void initJob(JobInProgress job){
+    try{
+      //job.updateStatus();
+      job.initTasks();
+    }catch(IOException ioe){
+      ioe.printStackTrace();
+    }
+  }
+
+  /**
+   * Add a job to the specified queue.
+   * @param name of the queue.
+   * @param job to be added.
+   */
+  public void addJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.addJob(job);
+  }
+
+  /**
+   * Remove a job from the head of a designated queue.
+   * @param name from which a job is removed.
+   * @param job to be removed from the queue.
+   */
+  public void removeJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.removeJob(job);
+  }
+
+  /**
+   * Move a job from a queue to another. 
+   * @param from a queue a job is to be removed.
+   * @param to a queue a job is to be added.
+   */
+  public void moveJob(String from, String to, JobInProgress job){
+    synchronized(queues){
+      removeJob(from, job);
+      addJob(to, job);
+    }  
+  }
+
+  /**
+   * Create a FCFS queue with the name provided.
+   * @param name of the queue. 
+   */
+  public void createFCFSQueue(String name){
+    queues.putIfAbsent(name, new FCFSQueue(name));
+  }
+
+  /**
+   * Find Queue according to the name specified.
+   * @param name of the queue. 
+   * @return queue of JobInProgress 
+   */
+  public Queue<JobInProgress> findQueue(String name){
+     return queues.get(name);
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
+ */
+class ReinitGroomAction extends GroomServerAction {
+
+  public ReinitGroomAction() {
+    super(ActionType.REINIT_GROOM);
+  }
+
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public void readFields(DataInput in) throws IOException {
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reports status of GroomServer.
+ */
+public class ReportGroomStatusDirective extends Directive implements Writable {
+  public static final Log LOG = LogFactory.getLog(ReportGroomStatusDirective.class);
+
+  private GroomServerStatus status;
+
+  public ReportGroomStatusDirective(){ super(); }
+  
+  public ReportGroomStatusDirective(GroomServerStatus status) {
+    super(Directive.Type.Response);
+    this.status = status;
+  }
+
+  public GroomServerStatus getStatus() {
+    return this.status;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    this.status.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.status = new GroomServerStatus();
+    this.status.readFields(in);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * <code>RunningJob</code> is the user-interface to query for details on a
+ * running BSP job.
+ * 
+ * <p>
+ * Clients can get hold of <code>RunningJob</code> via the {@link BSPJobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.
+ * </p>
+ * 
+ * @see BSPJobClient
+ */
+public interface RunningJob {
+  /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public BSPJobID getID();
+
+  /**
+   * Get the name of the job.
+   * 
+   * @return the name of the job.
+   */
+  public String getJobName();
+
+  /**
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
+   */
+  public String getJobFile();
+
+  /**
+   * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
+   * When all bsp tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's tasks.
+   * @throws IOException
+   */
+  public long progress() throws IOException;
+
+  /**
+   * Check if the job is finished or not. This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException;
+
+  /**
+   * Check if the job completed successfully.
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException;
+
+  /**
+   * Blocks until the job is complete.
+   * 
+   * @throws IOException
+   */
+  public void waitForCompletion() throws IOException;
+
+  /**
+   * Returns the current state of the Job. {@link JobStatus}
+   * 
+   * @throws IOException
+   */
+  public int getJobState() throws IOException;
+
+  /**
+   * Kill the running job. Blocks until all job tasks have been killed as well.
+   * If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException;
+
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   * @throws IOException
+   */
+  public void killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
+
+  public long getSuperstepCount() throws IOException;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * This is the class that schedules commands to GroomServer(s)  
+ */
+public interface Schedulable{
+
+  /**
+   * Schedule job to designated GroomServer(s) immediately.
+   * @param job to be scheduled. 
+   * @param statuses of GroomServer(s).
+   * @throws IOException
+   */
+  void schedule(JobInProgress job, GroomServerStatus... statuses) 
+      throws IOException;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * A simple task scheduler. 
+ */
+class SimpleTaskScheduler extends TaskScheduler {
+
+  private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
+
+  public static final String WAIT_QUEUE = "waitQueue";
+  public static final String PROCESSING_QUEUE = "processingQueue";
+  public static final String FINISHED_QUEUE = "finishedQueue";
+
+  private QueueManager queueManager;
+  private volatile boolean initialized;
+  private JobListener jobListener;
+  private JobProcessor jobProcessor;
+
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+      queueManager.initJob(job); // init task
+      queueManager.addJob(WAIT_QUEUE, job);
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) throws IOException {
+      // queueManager.removeJob(WAIT_QUEUE, job);
+      queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+    }
+  }
+
+  private class JobProcessor extends Thread implements Schedulable {
+    JobProcessor() {
+      super("JobProcess");
+    }
+
+    /**
+     * Main logic scheduling task to GroomServer(s). Also, it will move
+     * JobInProgress from Wait Queue to Processing Queue.
+     */
+    public void run() {
+      if (false == initialized) {
+        throw new IllegalStateException("SimpleTaskScheduler initialization"
+            + " is not yet finished!");
+      }
+      while (initialized) {
+        Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+        if (null == queue) {
+          LOG.error(WAIT_QUEUE + " does not exist.");
+          throw new NullPointerException(WAIT_QUEUE + " does not exist.");
+        }
+        // move a job from the wait queue to the processing queue
+        JobInProgress j = queue.removeJob();
+        queueManager.addJob(PROCESSING_QUEUE, j);
+        // schedule
+        Collection<GroomServerStatus> glist = groomServerManager
+            .groomServerStatusKeySet();
+        schedule(j, (GroomServerStatus[]) glist
+            .toArray(new GroomServerStatus[glist.size()]));
+      }
+    }
+
+    /**
+     * Schedule job to designated GroomServer(s) immediately.
+     * 
+     * @param Targeted GroomServer(s).
+     * @param Job to be scheduled.
+     */
+    @Override
+    public void schedule(JobInProgress job, GroomServerStatus... statuses) {
+      ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+      final int numGroomServers = clusterStatus.getGroomServers();
+      final ScheduledExecutorService sched = Executors
+          .newScheduledThreadPool(statuses.length + 5);
+      for (GroomServerStatus status : statuses) {
+        sched
+            .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
+      }// for
+    }
+  }
+
+  private class TaskWorker implements Runnable {
+    private final GroomServerStatus stus;
+    private final int groomNum;
+    private final JobInProgress jip;
+
+    TaskWorker(final GroomServerStatus stus, final int num,
+        final JobInProgress jip) {
+      this.stus = stus;
+      this.groomNum = num;
+      this.jip = jip;
+      if (null == this.stus)
+        throw new NullPointerException("Target groom server is not "
+            + "specified.");
+      if (-1 == this.groomNum)
+        throw new IllegalArgumentException("Groom number is not specified.");
+      if (null == this.jip)
+        throw new NullPointerException("No job is specified.");
+    }
+
+    public void run() {
+      // obtain tasks
+      Task t = jip.obtainNewTask(this.stus, groomNum);
+      
+      // assembly into actions
+      // List<Task> tasks = new ArrayList<Task>();
+      if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+        GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
+        try {
+          // dispatch() to the groom server
+          Directive d1 = new DispatchTasksDirective(groomServerManager
+              .currentGroomServerPeers(), new GroomServerAction[] { 
+              new LaunchTaskAction(t)});
+          worker.dispatch(d1);
+        } catch (IOException ioe) {
+          LOG.error("Fail to dispatch tasks to GroomServer "
+              + this.stus.getGroomName(), ioe);
+        }
+      } else {
+        LOG.warn("Currently master only shcedules job in running state. "
+            + "This may be refined in the future. JobId:" + jip.getJobID());
+      }
+    }
+  }
+
+  public SimpleTaskScheduler() {
+    this.jobListener = new JobListener();
+    this.jobProcessor = new JobProcessor();
+  }
+
+  @Override
+  public void start() {
+    this.queueManager = new QueueManager(getConf()); // TODO: need factory?
+    this.queueManager.createFCFSQueue(WAIT_QUEUE);
+    this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
+    this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+    groomServerManager.addJobInProgressListener(this.jobListener);
+    this.initialized = true;
+    this.jobProcessor.start();
+  }
+
+  @Override
+  public void terminate() {
+    this.initialized = false;
+    if (null != this.jobListener)
+      groomServerManager.removeJobInProgressListener(this.jobListener);
+  }
+
+  @Override
+  public Collection<JobInProgress> getJobs(String queue) {
+    return (queueManager.findQueue(queue)).jobs();
+    // return jobQueue;
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.ipc.BSPPeerProtocol;
+
+/**
+ * Base class for tasks.
+ */
+public abstract class Task implements Writable {
+  public static final Log LOG = LogFactory.getLog(Task.class);
+  // //////////////////////////////////////////
+  // Fields
+  // //////////////////////////////////////////
+
+  protected BSPJobID jobId;
+  protected String jobFile;
+  protected TaskAttemptID taskId;
+  protected int partition;
+
+  protected LocalDirAllocator lDirAlloc;
+
+  public Task() {
+    jobId = new BSPJobID();
+    taskId = new TaskAttemptID();
+  }
+
+  public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId,
+      int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.taskId = taskId;
+    this.partition = partition;
+  }
+
+  // //////////////////////////////////////////
+  // Accessors
+  // //////////////////////////////////////////
+  public void setJobFile(String jobFile) {
+    this.jobFile = jobFile;
+  }
+
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  public TaskAttemptID getTaskAttemptId() {
+    return this.taskId;
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+
+  /**
+   * Get the job name for this task.
+   * 
+   * @return the job name
+   */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  /**
+   * Get the index of this task within the job.
+   * 
+   * @return the integer part of the task id
+   */
+  public int getPartition() {
+    return partition;
+  }
+
+  @Override
+  public String toString() {
+    return taskId.toString();
+  }
+
+  // //////////////////////////////////////////
+  // Writable
+  // //////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobId.write(out);
+    Text.writeString(out, jobFile);
+    taskId.write(out);
+    out.writeInt(partition);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobId.readFields(in);
+    jobFile = Text.readString(in);
+    taskId.readFields(in);
+    partition = in.readInt();
+  }
+
+  /**
+   * Run this task as a part of the named job. This method is executed in the
+   * child process.
+   * 
+   * @param bspPeer for communications
+   * @param umbilical for communications with GroomServer
+   */
+  public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+      throws IOException;
+
+  public abstract BSPTaskRunner createRunner(GroomServer groom);
+
+  public void done(BSPPeerProtocol umbilical) throws IOException {
+    umbilical.done(getTaskID(), true);
+  }
+  
+  public abstract BSPJob getConf();
+  public abstract void setConf(BSPJob localJobConf);
+  
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContext extends BSPJobContext implements Progressable {
+  private final TaskAttemptID taskId;
+  private String status = "";
+
+  public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+    super(conf, taskId.getJobID());
+    this.taskId = taskId;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String msg) throws IOException {
+    status = msg;
+  }
+
+  /**
+   * Get the last set status message.
+   * 
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Report progress. The subtypes actually do work in this method.
+   */
+  public void progress() {
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TaskAttemptID is a unique identifier for a task attempt.
+ */
+public class TaskAttemptID extends ID {
+  protected static final String ATTEMPT = "attempt";
+  private TaskID taskId;
+
+  public TaskAttemptID(TaskID taskId, int id) {
+    super(id);
+    if (taskId == null) {
+      throw new IllegalArgumentException("taskId cannot be null");
+    }
+    this.taskId = taskId;
+  }
+
+  public TaskAttemptID(String jtIdentifier, int jobId, int taskId, int id) {
+    this(new TaskID(jtIdentifier, jobId, taskId), id);
+  }
+
+  public TaskAttemptID() {
+    taskId = new TaskID();
+  }
+
+  public BSPJobID getJobID() {
+    return taskId.getJobID();
+  }
+
+  public TaskID getTaskID() {
+    return taskId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskAttemptID that = (TaskAttemptID) o;
+    return this.taskId.equals(that.taskId);
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return taskId.appendTo(builder).append(SEPARATOR).append(id);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    taskId.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    taskId.write(out);
+  }
+
+  @Override
+  public int hashCode() {
+    return taskId.hashCode() * 5 + id;
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskAttemptID that = (TaskAttemptID) o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if (tipComp == 0) {
+      return this.id - that.id;
+    } else
+      return tipComp;
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(ATTEMPT)).toString();
+  }
+
+  public static TaskAttemptID forName(String str)
+      throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split(Character.toString(SEPARATOR));
+      if (parts.length == 5) {
+        if (parts[0].equals(ATTEMPT)) {
+          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+              Integer.parseInt(parts[3]), Integer.parseInt(parts[4]));
+        }
+      }
+    } catch (Exception ex) {
+      // fall below
+    }
+    throw new IllegalArgumentException("TaskAttemptId string : " + str
+        + " is not properly formed");
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * TaskID represents the immutable and unique identifier for a BSP Task.
+ */
+public class TaskID extends ID {
+  protected static final String TASK = "task";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
+  private BSPJobID jobId;
+
+  public TaskID(BSPJobID jobId, int id) {
+    super(id);
+    if (jobId == null) {
+      throw new IllegalArgumentException("jobId cannot be null");
+    }
+    this.jobId = jobId;
+  }
+
+  public TaskID(String jtIdentifier, int jobId, int id) {
+    this(new BSPJobID(jtIdentifier, jobId), id);
+  }
+
+  public TaskID() {
+    jobId = new BSPJobID();
+  }
+
+  /** Returns the {@link BSPJobID} object that this tip belongs to */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskID that = (TaskID) o;
+    return this.jobId.equals(that.jobId);
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskID that = (TaskID) o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if (jobComp == 0) {
+      return this.id - that.id;
+    } else {
+      return jobComp;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(TASK)).toString();
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return jobId.appendTo(builder).append(SEPARATOR)
+        .append(idFormat.format(id));
+  }
+
+  @Override
+  public int hashCode() {
+    return jobId.hashCode() * 524287 + id;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    jobId.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+  }
+
+  public static TaskID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if (parts.length == 5) {
+        if (parts[0].equals(TASK)) {
+          return new TaskID(parts[1], Integer.parseInt(parts[2]), Integer
+              .parseInt(parts[4]));
+        }
+      }
+    } catch (Exception ex) {
+    }
+    throw new IllegalArgumentException("TaskId string : " + str
+        + " is not properly formed");
+  }
+}