You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC
svn commit: r1152788 [4/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/
core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/hama/ core/src/main/java/o...
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * JobInProgress maintains all the info for keeping a Job on the straight and
+ * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
+ * tables for doing bookkeeping of its Tasks.ss
+ */
+class JobInProgress {
+
+ /**
+ * Used when the a kill is issued to a job which is initializing.
+ */
+ static class KillInterruptedException extends InterruptedException {
+ private static final long serialVersionUID = 1L;
+
+ public KillInterruptedException(String msg) {
+ super(msg);
+ }
+ }
+
+ static final Log LOG = LogFactory.getLog(JobInProgress.class);
+ boolean tasksInited = false;
+ boolean jobInited = false;
+
+ Configuration conf;
+ JobProfile profile;
+ JobStatus status;
+ Path jobFile = null;
+ Path localJobFile = null;
+ Path localJarFile = null;
+ private LocalFileSystem localFs;
+ // Indicates how many times the job got restarted
+ private int restartCount;
+
+ long startTime;
+ long launchTime;
+ long finishTime;
+
+ private String jobName;
+
+ // private LocalFileSystem localFs;
+ private BSPJobID jobId;
+ final BSPMaster master;
+ TaskInProgress tasks[] = new TaskInProgress[0];
+ private long superstepCounter;
+
+ int numBSPTasks = 0;
+ int clusterSize;
+
+ public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
+ Configuration conf) throws IOException {
+ this.conf = conf;
+ this.jobId = jobId;
+ this.localFs = FileSystem.getLocal(conf);
+ this.jobFile = jobFile;
+ this.master = master;
+ this.status = new JobStatus(jobId, null, 0L, 0L,
+ JobStatus.State.PREP.value());
+ this.startTime = System.currentTimeMillis();
+ this.superstepCounter = 0;
+ this.restartCount = 0;
+
+ this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ + ".xml");
+ this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ + ".jar");
+
+ Path jobDir = master.getSystemDirectoryForJob(jobId);
+ FileSystem fs = jobDir.getFileSystem(conf);
+ fs.copyToLocalFile(jobFile, localJobFile);
+ BSPJob job = new BSPJob(jobId, localJobFile.toString());
+ this.numBSPTasks = job.getNumBspTask();
+
+ this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+ job.getJobName());
+
+ this.setJobName(job.getJobName());
+
+ status.setUsername(job.getUser());
+ status.setStartTime(startTime);
+
+ String jarFile = job.getJar();
+ if (jarFile != null) {
+ fs.copyToLocalFile(new Path(jarFile), localJarFile);
+ }
+
+ }
+
+ public JobProfile getProfile() {
+ return profile;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public synchronized long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ /**
+ * @return the number of desired tasks.
+ */
+ public int desiredBSPTasks() {
+ return numBSPTasks;
+ }
+
+ /**
+ * @return The JobID of this JobInProgress.
+ */
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ public synchronized TaskInProgress findTaskInProgress(TaskID id) {
+ if (areTasksInited()) {
+ for (TaskInProgress tip : tasks) {
+ if (tip.getTaskId().equals(id)) {
+ return tip;
+ }
+ }
+ }
+ return null;
+ }
+
+ public synchronized boolean areTasksInited() {
+ return this.tasksInited;
+ }
+
+ public String toString() {
+ return "jobName:" + profile.getJobName() + "\n" + "submit user:"
+ + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
+ + jobFile + "\n";
+ }
+
+ // ///////////////////////////////////////////////////
+ // Create/manage tasks
+ // ///////////////////////////////////////////////////
+
+ public synchronized void initTasks() throws IOException {
+ if (tasksInited) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("numBSPTasks: " + numBSPTasks);
+ }
+
+ // adjust number of map tasks to actual number of splits
+ this.tasks = new TaskInProgress[numBSPTasks];
+ for (int i = 0; i < numBSPTasks; i++) {
+ tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+ this.master, this.conf, this, i);
+ }
+
+ // Update job status
+ this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
+ 0L, 0L, JobStatus.RUNNING);
+
+ tasksInited = true;
+ LOG.debug("Job is initialized.");
+ }
+
+ public synchronized Task obtainNewTask(GroomServerStatus status,
+ int clusterSize) {
+ this.clusterSize = clusterSize;
+
+ if (this.status.getRunState() != JobStatus.RUNNING) {
+ LOG.info("Cannot create task split for " + profile.getJobID());
+ return null;
+ }
+
+ Task result = null;
+ try {
+ for (int i = 0; i < tasks.length; i++) {
+ if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+ result = tasks[i].getTaskToRun(status);
+ break;
+ }
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception while obtaining new task!", e);
+ }
+ return result;
+ }
+
+ public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
+ TaskAttemptID taskid = status.getTaskId();
+ updateTaskStatus(tip, status);
+ LOG.info("Taskid '" + taskid + "' has finished successfully.");
+ tip.completed(taskid);
+
+ //
+ // If all tasks are complete, then the job is done!
+ //
+
+ boolean allDone = true;
+ for (TaskInProgress taskInProgress : tasks) {
+ if (!taskInProgress.isComplete()) {
+ allDone = false;
+ break;
+ }
+ }
+
+ if (allDone) {
+ this.status = new JobStatus(this.status.getJobID(),
+ this.profile.getUser(), superstepCounter, superstepCounter,
+ superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+ this.finishTime = System.currentTimeMillis();
+ this.status.setFinishTime(this.finishTime);
+
+ LOG.debug("Job successfully done.");
+
+ garbageCollect();
+ }
+ }
+
+ public void failedTask(TaskInProgress tip, TaskStatus status) {
+ TaskAttemptID taskid = status.getTaskId();
+ updateTaskStatus(tip, status);
+ LOG.info("Taskid '" + taskid + "' has failed.");
+ tip.terminated(taskid);
+
+ //
+ // If all tasks are complete, then the job is done!
+ //
+
+ boolean allDone = true;
+ for (TaskInProgress taskInProgress : tasks) {
+ if (!taskInProgress.isFailed()) {
+ allDone = false;
+ break;
+ }
+ }
+
+ if (allDone) {
+ this.status = new JobStatus(this.status.getJobID(),
+ this.profile.getUser(), superstepCounter, superstepCounter,
+ superstepCounter, JobStatus.FAILED, superstepCounter);
+ this.finishTime = System.currentTimeMillis();
+ this.status.setFinishTime(this.finishTime);
+
+ LOG.debug("Job failed.");
+
+ garbageCollect();
+ }
+ }
+
+ public synchronized void updateTaskStatus(TaskInProgress tip,
+ TaskStatus taskStatus) {
+ tip.updateStatus(taskStatus); // update tip
+
+ if (superstepCounter < taskStatus.getSuperstepCount()) {
+ superstepCounter = taskStatus.getSuperstepCount();
+ // TODO Later, we have to update JobInProgress status here
+
+ }
+ }
+
+ public synchronized void kill() {
+ if (status.getRunState() != JobStatus.KILLED) {
+ this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
+ 0L, 0L, 0L, JobStatus.KILLED);
+ this.finishTime = System.currentTimeMillis();
+ this.status.setFinishTime(this.finishTime);
+ //
+ // kill all TIPs.
+ //
+ for (int i = 0; i < tasks.length; i++) {
+ tasks[i].kill();
+ }
+
+ garbageCollect();
+ }
+
+ }
+
+ /**
+ * The job is dead. We're now GC'ing it, getting rid of the job from all
+ * tables. Be sure to remove all of this job's tasks from the various tables.
+ */
+ synchronized void garbageCollect() {
+ try {
+ // Definitely remove the local-disk copy of the job file
+ if (localJobFile != null) {
+ localFs.delete(localJobFile, true);
+ localJobFile = null;
+ }
+ if (localJarFile != null) {
+ localFs.delete(localJarFile, true);
+ localJarFile = null;
+ }
+
+ // JobClient always creates a new directory with job files
+ // so we remove that directory to cleanup
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(profile.getJobFile()).getParent(), true);
+
+ } catch (IOException e) {
+ LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
+ }
+ }
+
+ /**
+ * Get the number of times the job has restarted
+ */
+ int getNumRestarts() {
+ return restartCount;
+ }
+
+ /**
+ * @param jobName the jobName to set
+ */
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ /**
+ * @return the jobName
+ */
+ public String getJobName() {
+ return jobName;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+ /**
+ * Invoked when a new job has been added to the {@link BSPMaster}.
+ * @param job The job to be added.
+ * @throws IOException
+ */
+ public abstract void jobAdded(JobInProgress job) throws IOException;
+
+ /**
+ * Invoked when a job has been removed from the {@link BSPMaster}.
+ * @param job The job to be removed .
+ * @throws IOException
+ */
+ public abstract void jobRemoved(JobInProgress job) throws IOException;
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobProfile.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * A JobProfile tracks job's status.
+ */
+public class JobProfile implements Writable {
+
+ static { // register actor
+ WritableFactories.setFactory(JobProfile.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new JobProfile();
+ }
+ });
+ }
+
+ String user;
+ final BSPJobID jobid;
+ String jobFile;
+ String name;
+
+ /**
+ * Construct an empty {@link JobProfile}.
+ */
+ public JobProfile() {
+ jobid = new BSPJobID();
+ }
+
+ /**
+ * Construct a {@link JobProfile} the userid, jobid, job config-file,
+ * job-details url and job name.
+ *
+ * @param user userid of the person who submitted the job.
+ * @param jobid id of the job.
+ * @param jobFile job configuration file.
+ * @param name user-specified job name.
+ */
+ public JobProfile(String user, BSPJobID jobid, String jobFile, String name) {
+ this.user = user;
+ this.jobid = jobid;
+ this.jobFile = jobFile;
+ this.name = name;
+ }
+
+ /**
+ * Get the user id.
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Get the job id.
+ */
+ public BSPJobID getJobID() {
+ return jobid;
+ }
+
+ /**
+ * Get the configuration file for the job.
+ */
+ public String getJobFile() {
+ return jobFile;
+ }
+
+ /**
+ * Get the user-specified job name.
+ */
+ public String getJobName() {
+ return name;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ Text.writeString(out, jobFile);
+ Text.writeString(out, user);
+ Text.writeString(out, name);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ jobid.readFields(in);
+ this.jobFile = Text.readString(in);
+ this.user = Text.readString(in);
+ this.name = Text.readString(in);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Describes the current status of a job.
+ */
+public class JobStatus implements Writable, Cloneable {
+ public static final Log LOG = LogFactory.getLog(JobStatus.class);
+
+ static {
+ WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new JobStatus();
+ }
+ });
+ }
+
+ public static enum State {
+ RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+ int s;
+
+ State(int s) {
+ this.s = s;
+ }
+
+ public int value() {
+ return this.s;
+ }
+
+ @Override
+ public String toString() {
+ String name = null;
+ switch (this) {
+ case RUNNING:
+ name = "RUNNING";
+ break;
+ case SUCCEEDED:
+ name = "SUCCEEDED";
+ break;
+ case FAILED:
+ name = "FAILED";
+ break;
+ case PREP:
+ name = "SETUP";
+ break;
+ case KILLED:
+ name = "KILLED";
+ break;
+ }
+
+ return name;
+ }
+
+ }
+
+ public static final int RUNNING = 1;
+ public static final int SUCCEEDED = 2;
+ public static final int FAILED = 3;
+ public static final int PREP = 4;
+ public static final int KILLED = 5;
+
+ private BSPJobID jobid;
+ private long progress;
+ private long cleanupProgress;
+ private long setupProgress;
+ private volatile State state;// runState in enum
+ private int runState;
+ private long startTime;
+ private String schedulingInfo = "NA";
+ private String user;
+ private long superstepCount;
+ private String name;
+
+ private long finishTime;
+
+ public JobStatus() {
+ }
+
+ public JobStatus(BSPJobID jobid, String user, long progress, int runState) {
+ this(jobid, user, progress, 0, runState);
+ }
+
+ public JobStatus(BSPJobID jobid, String user, long progress,
+ long cleanupProgress, int runState) {
+ this(jobid, user, 0, progress, cleanupProgress, runState);
+ }
+
+ public JobStatus(BSPJobID jobid, String user, long setupProgress,
+ long progress, long cleanupProgress, int runState) {
+ this(jobid, user, 0, progress, cleanupProgress, runState, 0);
+ }
+
+ public JobStatus(BSPJobID jobid, String user, long setupProgress,
+ long progress, long cleanupProgress, int runState, long superstepCount) {
+ this.jobid = jobid;
+ this.setupProgress = setupProgress;
+ this.progress = progress;
+ this.cleanupProgress = cleanupProgress;
+ this.runState = runState;
+ this.state = State.values()[runState - 1];
+ this.superstepCount = superstepCount;
+ this.user = user;
+ }
+
+ public BSPJobID getJobID() {
+ return jobid;
+ }
+
+ public synchronized long progress() {
+ return progress;
+ }
+
+ synchronized void setprogress(long p) {
+ this.progress = p;
+ }
+
+ public synchronized long cleanupProgress() {
+ return cleanupProgress;
+ }
+
+ synchronized void setCleanupProgress(int p) {
+ this.cleanupProgress = p;
+ }
+
+ public synchronized long setupProgress() {
+ return setupProgress;
+ }
+
+ synchronized void setSetupProgress(long p) {
+ this.setupProgress = p;
+ }
+
+ public JobStatus.State getState() {
+ return this.state;
+ }
+
+ public void setState(JobStatus.State state) {
+ this.state = state;
+ }
+
+ public synchronized int getRunState() {
+ return runState;
+ }
+
+ public synchronized void setRunState(int state) {
+ this.runState = state;
+ }
+
+ public synchronized long getSuperstepCount() {
+ return superstepCount;
+ }
+
+ public synchronized void setSuperstepCount(long superstepCount) {
+ this.superstepCount = superstepCount;
+ }
+
+ public synchronized void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public synchronized long getStartTime() {
+ return startTime;
+ }
+
+ public synchronized void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ /**
+ * Get the finish time of the job.
+ */
+ public synchronized long getFinishTime() {
+ return finishTime;
+ }
+
+ /**
+ * @param user The username of the job
+ */
+ public synchronized void setUsername(String user) {
+ this.user = user;
+ }
+
+ /**
+ * @return the username of the job
+ */
+ public synchronized String getUsername() {
+ return user;
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+ public synchronized String getSchedulingInfo() {
+ return schedulingInfo;
+ }
+
+ public synchronized void setSchedulingInfo(String schedulingInfo) {
+ this.schedulingInfo = schedulingInfo;
+ }
+
+ public synchronized boolean isJobComplete() {
+ return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
+ }
+
+ public synchronized void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ out.writeLong(setupProgress);
+ out.writeLong(progress);
+ out.writeLong(cleanupProgress);
+ out.writeInt(runState);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ Text.writeString(out, user);
+ Text.writeString(out, schedulingInfo);
+ out.writeLong(superstepCount);
+ }
+
+ public synchronized void readFields(DataInput in) throws IOException {
+ this.jobid = new BSPJobID();
+ jobid.readFields(in);
+ this.setupProgress = in.readLong();
+ this.progress = in.readLong();
+ this.cleanupProgress = in.readLong();
+ this.runState = in.readInt();
+ this.startTime = in.readLong();
+ this.finishTime = in.readLong();
+ this.user = Text.readString(in);
+ this.schedulingInfo = Text.readString(in);
+ this.superstepCount = in.readLong();
+ }
+
+ /**
+ * @param name the name to set
+ */
+ public synchronized void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the name
+ */
+ public synchronized String getName() {
+ return name;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillJobAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
+ * resources.
+ */
+class KillJobAction extends GroomServerAction {
+ String jobId;
+
+ public KillJobAction() {
+ super(ActionType.KILL_JOB);
+ jobId = new String();
+ }
+
+ public KillJobAction(String killJobId) {
+ super(ActionType.KILL_JOB);
+ this.jobId = killJobId;
+ }
+
+ public String getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, jobId);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobId = Text.readString(in);
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/KillTaskAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class KillTaskAction extends GroomServerAction {
+ TaskAttemptID taskId;
+
+ public KillTaskAction() {
+ super(ActionType.KILL_TASK);
+ taskId = new TaskAttemptID();
+ }
+
+ public KillTaskAction(TaskAttemptID killTaskId) {
+ super(ActionType.KILL_TASK);
+ this.taskId = killTaskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+class LaunchTaskAction extends GroomServerAction {
+ private Task task;
+
+ public LaunchTaskAction() {
+ super(ActionType.LAUNCH_TASK);
+ }
+
+ public LaunchTaskAction(Task task) {
+ super(ActionType.LAUNCH_TASK);
+ this.task = task;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ task.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ task = new BSPTask();
+ task.readFields(in);
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A multithreaded local BSP runner that can be used for debugging BSP's. It
+ * uses the working directory "/user/hama/bsp/" and starts runners based on the
+ * number of the machines core.
+ *
+ */
+public class LocalBSPRunner implements JobSubmissionProtocol {
+ public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
+
+ private static final String IDENTIFIER = "localrunner";
+ private static String WORKING_DIR = "/user/hama/bsp/";
+ protected static volatile ThreadPoolExecutor threadPool;
+ protected static int threadPoolSize;
+ protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+ protected static CyclicBarrier barrier;
+
+ static {
+ threadPoolSize = Runtime.getRuntime().availableProcessors();
+ barrier = new CyclicBarrier(threadPoolSize);
+ threadPool = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(threadPoolSize);
+ }
+
+ protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
+ protected String jobFile;
+ protected String jobName;
+
+ protected JobStatus currentJobStatus;
+
+ protected Configuration conf;
+ protected FileSystem fs;
+
+ public LocalBSPRunner(Configuration conf) throws IOException {
+ super();
+ this.conf = conf;
+ this.fs = FileSystem.get(conf);
+ String path = conf.get("bsp.local.dir");
+ if (path != null && !path.isEmpty())
+ WORKING_DIR = path;
+
+ threadPoolSize = conf.getInt("bsp.local.tasks.maximum", 20);
+ threadPool = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(threadPoolSize);
+ barrier = new CyclicBarrier(threadPoolSize);
+
+ for (int i = 0; i < threadPoolSize; i++) {
+ String name = IDENTIFIER + " " + i;
+ localGrooms.put(name, new LocalGroom(name));
+ }
+
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 3;
+ }
+
+ @Override
+ public BSPJobID getNewJobId() throws IOException {
+ return new BSPJobID(IDENTIFIER, 1);
+ }
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ this.jobFile = jobFile;
+ BSPJob job = new BSPJob(jobID, jobFile);
+ job.setNumBspTask(threadPoolSize);
+ this.jobName = job.getJobName();
+ currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
+ JobStatus.RUNNING);
+ for (int i = 0; i < threadPoolSize; i++) {
+ String name = IDENTIFIER + " " + i;
+ LocalGroom localGroom = new LocalGroom(name);
+ localGrooms.put(name, localGroom);
+ futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
+ .newInstance(job.getBspClass(), conf), localGroom)));
+ }
+ new Thread(new ThreadObserver(currentJobStatus)).start();
+ return currentJobStatus;
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Entry<String, LocalGroom> entry : localGrooms.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().getPeerName());
+ }
+ return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING);
+ }
+
+ @Override
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+ return new JobProfile(System.getProperty("user.name"), jobid, jobFile,
+ jobName);
+ }
+
+ @Override
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+ if (currentJobStatus == null) {
+ currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
+ 0L, JobStatus.RUNNING);
+ }
+ return currentJobStatus;
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException {
+ return fs.getUri().toString();
+ }
+
+ @Override
+ public JobStatus[] jobsToComplete() throws IOException {
+ return null;
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getSystemDir() {
+ return WORKING_DIR;
+ }
+
+ @Override
+ public void killJob(BSPJobID jobid) throws IOException {
+ return;
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException {
+ return false;
+ }
+
+ // this class will spawn a new thread and executes the BSP
+ class BSPRunner implements Callable<BSP> {
+
+ Configuration conf;
+ BSPJob job;
+ BSP bsp;
+ LocalGroom groom;
+
+ public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) {
+ super();
+ this.conf = conf;
+ this.job = job;
+ this.bsp = bsp;
+ this.groom = groom;
+ }
+
+ public void run() {
+ bsp.setConf(conf);
+ try {
+ bsp.bsp(groom);
+ } catch (Exception e) {
+ LOG.error("Exception during BSP execution!", e);
+ }
+ }
+
+ @Override
+ public BSP call() throws Exception {
+ run();
+ return bsp;
+ }
+ }
+
+ // this thread observes the status of the runners.
+ class ThreadObserver implements Runnable {
+
+ JobStatus status;
+
+ public ThreadObserver(JobStatus currentJobStatus) {
+ this.status = currentJobStatus;
+ }
+
+ @Override
+ public void run() {
+ boolean success = true;
+ for (Future<BSP> future : futureList) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ LOG.error("Exception during BSP execution!", e);
+ success = false;
+ } catch (ExecutionException e) {
+ LOG.error("Exception during BSP execution!", e);
+ success = false;
+ }
+ }
+ if (success) {
+ currentJobStatus.setState(JobStatus.State.SUCCEEDED);
+ currentJobStatus.setRunState(JobStatus.SUCCEEDED);
+ } else {
+ currentJobStatus.setState(JobStatus.State.FAILED);
+ currentJobStatus.setRunState(JobStatus.FAILED);
+ }
+ threadPool.shutdownNow();
+ }
+
+ }
+
+ class LocalGroom extends BSPPeer {
+ private long superStepCount = 0;
+ private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ // outgoing queue
+ private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
+ private final String peerName;
+
+ public LocalGroom(String peerName) throws IOException {
+ this.peerName = peerName;
+ }
+
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ if (this.peerName.equals(peerName)) {
+ put(msg);
+ } else {
+ // put this into a outgoing queue
+ if (outgoingQueues.get(peerName) == null) {
+ outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
+ }
+ outgoingQueues.get(peerName).add(msg);
+ }
+ }
+
+ @Override
+ public void put(BSPMessage msg) throws IOException {
+ localMessageQueue.add(msg);
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localMessageQueue.poll();
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return localMessageQueue.size();
+ }
+
+ @Override
+ public void sync() throws IOException, KeeperException,
+ InterruptedException {
+ // wait until all threads reach this barrier
+ barrierSync();
+ // send the messages
+ for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
+ .entrySet()) {
+ String peerName = entry.getKey();
+ for (BSPMessage msg : entry.getValue())
+ localGrooms.get(peerName).put(msg);
+ }
+ // clear the local outgoing queue
+ outgoingQueues.clear();
+ // sync again to avoid data inconsistency
+ barrierSync();
+ incrementSuperSteps();
+ }
+
+ private void barrierSync() throws InterruptedException {
+ try {
+ barrier.await();
+ } catch (BrokenBarrierException e) {
+ throw new InterruptedException("Barrier has been broken!" + e);
+ }
+ }
+
+ private void incrementSuperSteps() {
+ currentJobStatus.setprogress(superStepCount++);
+ currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+ }
+
+ @Override
+ public long getSuperstepCount() {
+ return superStepCount;
+ }
+
+ @Override
+ public String getPeerName() {
+ return peerName;
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ return localGrooms.keySet().toArray(
+ new String[localGrooms.keySet().size()]);
+ }
+
+ @Override
+ public void clear() {
+ localMessageQueue.clear();
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 3;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ }
+
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Messagable.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A interface for BSP message class.
+ */
+public interface Messagable {
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ *
+ */
+public class PeerNames implements Writable {
+ Collection<String> allPeers;
+
+ public PeerNames() {
+ this.allPeers = new ArrayList<String>();
+ }
+
+ public PeerNames(Collection<String> allPeers) {
+ this.allPeers = allPeers;
+ }
+
+ public Collection<String> getAllPeerNames() {
+ return allPeers;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(allPeers.size());
+ for (String peerName : allPeers) {
+ Text.writeString(out, peerName);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int peersNum = in.readInt();
+ for (int i = 0; i < peersNum; i++) {
+ allPeers.add(Text.readString(in));
+ }
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Queue.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+
+/**
+ * Job Queue interface.
+ *
+ * @param <T>
+ */
+public interface Queue<T>{
+
+ /**
+ * The queue name.
+ * @return the name of current queue.
+ */
+ String getName();
+
+ /**
+ * Add a job to a queue.
+ * @param job to be added to the queue.
+ */
+ void addJob(T job);
+
+ /**
+ * Remove a job from the queue.
+ * @param job to be removed from the queue.
+ */
+ void removeJob(T job);
+
+ /**
+ * Get a job
+ * @return job that is removed from the queue.
+ */
+ T removeJob();
+
+ /**
+ * Return all data stored in this queue.
+ * @return Collection of jobs.
+ */
+ public Collection<T> jobs();
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A BSPJob Queue Manager.
+ */
+public class QueueManager{
+
+ private ConcurrentMap<String, Queue<JobInProgress>> queues =
+ new ConcurrentHashMap<String, Queue<JobInProgress>>();
+
+ public QueueManager(Configuration conf){ }
+
+ /**
+ * Initialize a job.
+ * @param job required initialzied.
+ */
+ public void initJob(JobInProgress job){
+ try{
+ //job.updateStatus();
+ job.initTasks();
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ }
+ }
+
+ /**
+ * Add a job to the specified queue.
+ * @param name of the queue.
+ * @param job to be added.
+ */
+ public void addJob(String name, JobInProgress job){
+ Queue<JobInProgress> queue = queues.get(name);
+ if(null != queue) queue.addJob(job);
+ }
+
+ /**
+ * Remove a job from the head of a designated queue.
+ * @param name from which a job is removed.
+ * @param job to be removed from the queue.
+ */
+ public void removeJob(String name, JobInProgress job){
+ Queue<JobInProgress> queue = queues.get(name);
+ if(null != queue) queue.removeJob(job);
+ }
+
+ /**
+ * Move a job from a queue to another.
+ * @param from a queue a job is to be removed.
+ * @param to a queue a job is to be added.
+ */
+ public void moveJob(String from, String to, JobInProgress job){
+ synchronized(queues){
+ removeJob(from, job);
+ addJob(to, job);
+ }
+ }
+
+ /**
+ * Create a FCFS queue with the name provided.
+ * @param name of the queue.
+ */
+ public void createFCFSQueue(String name){
+ queues.putIfAbsent(name, new FCFSQueue(name));
+ }
+
+ /**
+ * Find Queue according to the name specified.
+ * @param name of the queue.
+ * @return queue of JobInProgress
+ */
+ public Queue<JobInProgress> findQueue(String name){
+ return queues.get(name);
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
+ */
+class ReinitGroomAction extends GroomServerAction {
+
+ public ReinitGroomAction() {
+ super(ActionType.REINIT_GROOM);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reports status of GroomServer.
+ */
+public class ReportGroomStatusDirective extends Directive implements Writable {
+ public static final Log LOG = LogFactory.getLog(ReportGroomStatusDirective.class);
+
+ private GroomServerStatus status;
+
+ public ReportGroomStatusDirective(){ super(); }
+
+ public ReportGroomStatusDirective(GroomServerStatus status) {
+ super(Directive.Type.Response);
+ this.status = status;
+ }
+
+ public GroomServerStatus getStatus() {
+ return this.status;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ this.status.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.status = new GroomServerStatus();
+ this.status.readFields(in);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * <code>RunningJob</code> is the user-interface to query for details on a
+ * running BSP job.
+ *
+ * <p>
+ * Clients can get hold of <code>RunningJob</code> via the {@link BSPJobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.
+ * </p>
+ *
+ * @see BSPJobClient
+ */
+public interface RunningJob {
+ /**
+ * Get the job identifier.
+ *
+ * @return the job identifier.
+ */
+ public BSPJobID getID();
+
+ /**
+ * Get the name of the job.
+ *
+ * @return the name of the job.
+ */
+ public String getJobName();
+
+ /**
+ * Get the path of the submitted job configuration.
+ *
+ * @return the path of the submitted job configuration.
+ */
+ public String getJobFile();
+
+ /**
+ * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
+ * When all bsp tasks have completed, the function returns 1.0.
+ *
+ * @return the progress of the job's tasks.
+ * @throws IOException
+ */
+ public long progress() throws IOException;
+
+ /**
+ * Check if the job is finished or not. This is a non-blocking call.
+ *
+ * @return <code>true</code> if the job is complete, else <code>false</code>.
+ * @throws IOException
+ */
+ public boolean isComplete() throws IOException;
+
+ /**
+ * Check if the job completed successfully.
+ *
+ * @return <code>true</code> if the job succeeded, else <code>false</code>.
+ * @throws IOException
+ */
+ public boolean isSuccessful() throws IOException;
+
+ /**
+ * Blocks until the job is complete.
+ *
+ * @throws IOException
+ */
+ public void waitForCompletion() throws IOException;
+
+ /**
+ * Returns the current state of the Job. {@link JobStatus}
+ *
+ * @throws IOException
+ */
+ public int getJobState() throws IOException;
+
+ /**
+ * Kill the running job. Blocks until all job tasks have been killed as well.
+ * If the job is no longer running, it simply returns.
+ *
+ * @throws IOException
+ */
+ public void killJob() throws IOException;
+
+ /**
+ * Kill indicated task attempt.
+ *
+ * @param taskId the id of the task to be terminated.
+ * @param shouldFail if true the task is failed and added to failed tasks
+ * list, otherwise it is just killed, w/o affecting job failure
+ * status.
+ * @throws IOException
+ */
+ public void killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException;
+
+ public long getSuperstepCount() throws IOException;
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * This is the class that schedules commands to GroomServer(s)
+ */
+public interface Schedulable{
+
+ /**
+ * Schedule job to designated GroomServer(s) immediately.
+ * @param job to be scheduled.
+ * @param statuses of GroomServer(s).
+ * @throws IOException
+ */
+ void schedule(JobInProgress job, GroomServerStatus... statuses)
+ throws IOException;
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * A simple task scheduler.
+ */
+class SimpleTaskScheduler extends TaskScheduler {
+
+ private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
+
+ public static final String WAIT_QUEUE = "waitQueue";
+ public static final String PROCESSING_QUEUE = "processingQueue";
+ public static final String FINISHED_QUEUE = "finishedQueue";
+
+ private QueueManager queueManager;
+ private volatile boolean initialized;
+ private JobListener jobListener;
+ private JobProcessor jobProcessor;
+
+ private class JobListener extends JobInProgressListener {
+ @Override
+ public void jobAdded(JobInProgress job) throws IOException {
+ queueManager.initJob(job); // init task
+ queueManager.addJob(WAIT_QUEUE, job);
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) throws IOException {
+ // queueManager.removeJob(WAIT_QUEUE, job);
+ queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+ }
+ }
+
+ private class JobProcessor extends Thread implements Schedulable {
+ JobProcessor() {
+ super("JobProcess");
+ }
+
+ /**
+ * Main logic scheduling task to GroomServer(s). Also, it will move
+ * JobInProgress from Wait Queue to Processing Queue.
+ */
+ public void run() {
+ if (false == initialized) {
+ throw new IllegalStateException("SimpleTaskScheduler initialization"
+ + " is not yet finished!");
+ }
+ while (initialized) {
+ Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+ if (null == queue) {
+ LOG.error(WAIT_QUEUE + " does not exist.");
+ throw new NullPointerException(WAIT_QUEUE + " does not exist.");
+ }
+ // move a job from the wait queue to the processing queue
+ JobInProgress j = queue.removeJob();
+ queueManager.addJob(PROCESSING_QUEUE, j);
+ // schedule
+ Collection<GroomServerStatus> glist = groomServerManager
+ .groomServerStatusKeySet();
+ schedule(j, (GroomServerStatus[]) glist
+ .toArray(new GroomServerStatus[glist.size()]));
+ }
+ }
+
+ /**
+ * Schedule job to designated GroomServer(s) immediately.
+ *
+ * @param Targeted GroomServer(s).
+ * @param Job to be scheduled.
+ */
+ @Override
+ public void schedule(JobInProgress job, GroomServerStatus... statuses) {
+ ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+ final int numGroomServers = clusterStatus.getGroomServers();
+ final ScheduledExecutorService sched = Executors
+ .newScheduledThreadPool(statuses.length + 5);
+ for (GroomServerStatus status : statuses) {
+ sched
+ .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
+ }// for
+ }
+ }
+
+ private class TaskWorker implements Runnable {
+ private final GroomServerStatus stus;
+ private final int groomNum;
+ private final JobInProgress jip;
+
+ TaskWorker(final GroomServerStatus stus, final int num,
+ final JobInProgress jip) {
+ this.stus = stus;
+ this.groomNum = num;
+ this.jip = jip;
+ if (null == this.stus)
+ throw new NullPointerException("Target groom server is not "
+ + "specified.");
+ if (-1 == this.groomNum)
+ throw new IllegalArgumentException("Groom number is not specified.");
+ if (null == this.jip)
+ throw new NullPointerException("No job is specified.");
+ }
+
+ public void run() {
+ // obtain tasks
+ Task t = jip.obtainNewTask(this.stus, groomNum);
+
+ // assembly into actions
+ // List<Task> tasks = new ArrayList<Task>();
+ if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+ GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
+ try {
+ // dispatch() to the groom server
+ Directive d1 = new DispatchTasksDirective(groomServerManager
+ .currentGroomServerPeers(), new GroomServerAction[] {
+ new LaunchTaskAction(t)});
+ worker.dispatch(d1);
+ } catch (IOException ioe) {
+ LOG.error("Fail to dispatch tasks to GroomServer "
+ + this.stus.getGroomName(), ioe);
+ }
+ } else {
+ LOG.warn("Currently master only shcedules job in running state. "
+ + "This may be refined in the future. JobId:" + jip.getJobID());
+ }
+ }
+ }
+
+ public SimpleTaskScheduler() {
+ this.jobListener = new JobListener();
+ this.jobProcessor = new JobProcessor();
+ }
+
+ @Override
+ public void start() {
+ this.queueManager = new QueueManager(getConf()); // TODO: need factory?
+ this.queueManager.createFCFSQueue(WAIT_QUEUE);
+ this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
+ this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+ groomServerManager.addJobInProgressListener(this.jobListener);
+ this.initialized = true;
+ this.jobProcessor.start();
+ }
+
+ @Override
+ public void terminate() {
+ this.initialized = false;
+ if (null != this.jobListener)
+ groomServerManager.removeJobInProgressListener(this.jobListener);
+ }
+
+ @Override
+ public Collection<JobInProgress> getJobs(String queue) {
+ return (queueManager.findQueue(queue)).jobs();
+ // return jobQueue;
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.ipc.BSPPeerProtocol;
+
+/**
+ * Base class for tasks.
+ */
+public abstract class Task implements Writable {
+ public static final Log LOG = LogFactory.getLog(Task.class);
+ // //////////////////////////////////////////
+ // Fields
+ // //////////////////////////////////////////
+
+ protected BSPJobID jobId;
+ protected String jobFile;
+ protected TaskAttemptID taskId;
+ protected int partition;
+
+ protected LocalDirAllocator lDirAlloc;
+
+ public Task() {
+ jobId = new BSPJobID();
+ taskId = new TaskAttemptID();
+ }
+
+ public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId,
+ int partition) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.taskId = taskId;
+ this.partition = partition;
+ }
+
+ // //////////////////////////////////////////
+ // Accessors
+ // //////////////////////////////////////////
+ public void setJobFile(String jobFile) {
+ this.jobFile = jobFile;
+ }
+
+ public String getJobFile() {
+ return jobFile;
+ }
+
+ public TaskAttemptID getTaskAttemptId() {
+ return this.taskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ /**
+ * Get the job name for this task.
+ *
+ * @return the job name
+ */
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ /**
+ * Get the index of this task within the job.
+ *
+ * @return the integer part of the task id
+ */
+ public int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public String toString() {
+ return taskId.toString();
+ }
+
+ // //////////////////////////////////////////
+ // Writable
+ // //////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobId.write(out);
+ Text.writeString(out, jobFile);
+ taskId.write(out);
+ out.writeInt(partition);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobId.readFields(in);
+ jobFile = Text.readString(in);
+ taskId.readFields(in);
+ partition = in.readInt();
+ }
+
+ /**
+ * Run this task as a part of the named job. This method is executed in the
+ * child process.
+ *
+ * @param bspPeer for communications
+ * @param umbilical for communications with GroomServer
+ */
+ public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+ throws IOException;
+
+ public abstract BSPTaskRunner createRunner(GroomServer groom);
+
+ public void done(BSPPeerProtocol umbilical) throws IOException {
+ umbilical.done(getTaskID(), true);
+ }
+
+ public abstract BSPJob getConf();
+ public abstract void setConf(BSPJob localJobConf);
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContext extends BSPJobContext implements Progressable {
+ private final TaskAttemptID taskId;
+ private String status = "";
+
+ public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+ super(conf, taskId.getJobID());
+ this.taskId = taskId;
+ }
+
+ /**
+ * Get the unique name for this task attempt.
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return taskId;
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ public void setStatus(String msg) throws IOException {
+ status = msg;
+ }
+
+ /**
+ * Get the last set status message.
+ *
+ * @return the current status message
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ /**
+ * Report progress. The subtypes actually do work in this method.
+ */
+ public void progress() {
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TaskAttemptID is a unique identifier for a task attempt.
+ */
+public class TaskAttemptID extends ID {
+ protected static final String ATTEMPT = "attempt";
+ private TaskID taskId;
+
+ public TaskAttemptID(TaskID taskId, int id) {
+ super(id);
+ if (taskId == null) {
+ throw new IllegalArgumentException("taskId cannot be null");
+ }
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID(String jtIdentifier, int jobId, int taskId, int id) {
+ this(new TaskID(jtIdentifier, jobId, taskId), id);
+ }
+
+ public TaskAttemptID() {
+ taskId = new TaskID();
+ }
+
+ public BSPJobID getJobID() {
+ return taskId.getJobID();
+ }
+
+ public TaskID getTaskID() {
+ return taskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ TaskAttemptID that = (TaskAttemptID) o;
+ return this.taskId.equals(that.taskId);
+ }
+
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return taskId.appendTo(builder).append(SEPARATOR).append(id);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ taskId.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ taskId.write(out);
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode() * 5 + id;
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ TaskAttemptID that = (TaskAttemptID) o;
+ int tipComp = this.taskId.compareTo(that.taskId);
+ if (tipComp == 0) {
+ return this.id - that.id;
+ } else
+ return tipComp;
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(ATTEMPT)).toString();
+ }
+
+ public static TaskAttemptID forName(String str)
+ throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split(Character.toString(SEPARATOR));
+ if (parts.length == 5) {
+ if (parts[0].equals(ATTEMPT)) {
+ return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+ Integer.parseInt(parts[3]), Integer.parseInt(parts[4]));
+ }
+ }
+ } catch (Exception ex) {
+ // fall below
+ }
+ throw new IllegalArgumentException("TaskAttemptId string : " + str
+ + " is not properly formed");
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskID.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * TaskID represents the immutable and unique identifier for a BSP Task.
+ */
+public class TaskID extends ID {
+ protected static final String TASK = "task";
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(6);
+ }
+
+ private BSPJobID jobId;
+
+ public TaskID(BSPJobID jobId, int id) {
+ super(id);
+ if (jobId == null) {
+ throw new IllegalArgumentException("jobId cannot be null");
+ }
+ this.jobId = jobId;
+ }
+
+ public TaskID(String jtIdentifier, int jobId, int id) {
+ this(new BSPJobID(jtIdentifier, jobId), id);
+ }
+
+ public TaskID() {
+ jobId = new BSPJobID();
+ }
+
+ /** Returns the {@link BSPJobID} object that this tip belongs to */
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ TaskID that = (TaskID) o;
+ return this.jobId.equals(that.jobId);
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ TaskID that = (TaskID) o;
+ int jobComp = this.jobId.compareTo(that.jobId);
+ if (jobComp == 0) {
+ return this.id - that.id;
+ } else {
+ return jobComp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(TASK)).toString();
+ }
+
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return jobId.appendTo(builder).append(SEPARATOR)
+ .append(idFormat.format(id));
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode() * 524287 + id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ jobId.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ jobId.write(out);
+ }
+
+ public static TaskID forName(String str) throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split("_");
+ if (parts.length == 5) {
+ if (parts[0].equals(TASK)) {
+ return new TaskID(parts[1], Integer.parseInt(parts[2]), Integer
+ .parseInt(parts[4]));
+ }
+ }
+ } catch (Exception ex) {
+ }
+ throw new IllegalArgumentException("TaskId string : " + str
+ + " is not properly formed");
+ }
+}