You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/04 11:11:05 UTC
svn commit: r1004154 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
Author: edwardyoon
Date: Mon Oct 4 09:11:04 2010
New Revision: 1004154
URL: http://svn.apache.org/viewvc?rev=1004154&view=rev
Log:
Implementations of Task progress
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 4 09:11:04 2010
@@ -46,6 +46,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-286: Task progress should be monitored (eddieyoon)
HAMA-287: BSPMaster should use the bsp.master.port config property
when creating its InetSocketAddr instance (Filipe Manana via edwardyoon)
HAMA-283: Removing duplicate code (Filipe Manana via edwardyoon)
@@ -153,6 +154,8 @@ Trunk (unreleased changes)
BUG FIXES
+ HAMA-291: bsp.groom.port is unnused and superseeded
+ by bsp.peer.port (Filipe Manana via edwardyoon)
HAMA-288: Typo might lead to null tasks getting assigned
to groom servers (Filipe Manana via edwardyoon)
HAMA-280: Fix warnings (Filipe Manana via edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct 4 09:11:04 2010
@@ -38,32 +38,37 @@ import org.apache.hama.ipc.JobSubmission
public class BSPJobClient extends Configured {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
- public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
+
+ public static enum TaskStatusFilter {
+ NONE, KILLED, FAILED, SUCCEEDED, ALL
+ }
+
private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
class NetworkedJob implements RunningJob {
JobProfile profile;
JobStatus status;
long statustime;
-
+
public NetworkedJob(JobStatus job) throws IOException {
this.status = job;
this.profile = jobSubmitClient.getJobProfile(job.getJobID());
this.statustime = System.currentTimeMillis();
}
-
+
/**
- * Some methods rely on having a recent job profile object. Refresh
- * it, if necessary
+ * Some methods rely on having a recent job profile object. Refresh it, if
+ * necessary
*/
synchronized void ensureFreshStatus() throws IOException {
if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
updateStatus();
}
}
-
- /** Some methods need to update status immediately. So, refresh
- * immediately
+
+ /**
+ * Some methods need to update status immediately. So, refresh immediately
+ *
* @throws IOException
*/
synchronized void updateStatus() throws IOException {
@@ -71,15 +76,17 @@ public class BSPJobClient extends Config
this.statustime = System.currentTimeMillis();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
* @see org.apache.hama.bsp.RunningJob#getID()
*/
@Override
- public BSPJobID getID() {
+ public BSPJobID getID() {
return profile.getJobID();
}
-
- /* (non-Javadoc)
+
+ /*
+ * (non-Javadoc)
* @see org.apache.hama.bsp.RunningJob#getJobName()
*/
@Override
@@ -87,30 +94,30 @@ public class BSPJobClient extends Config
return profile.getJobName();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
* @see org.apache.hama.bsp.RunningJob#getJobFile()
*/
@Override
public String getJobFile() {
return profile.getJobFile();
}
-
+
@Override
public float progress() throws IOException {
ensureFreshStatus();
return status.progress();
}
-
+
/**
* Returns immediately whether the whole job is done yet or not.
*/
public synchronized boolean isComplete() throws IOException {
updateStatus();
- return (status.getRunState() == JobStatus.SUCCEEDED ||
- status.getRunState() == JobStatus.FAILED ||
- status.getRunState() == JobStatus.KILLED);
+ return (status.getRunState() == JobStatus.SUCCEEDED
+ || status.getRunState() == JobStatus.FAILED || status.getRunState() == JobStatus.KILLED);
}
-
+
/**
* True iff job completed successfully.
*/
@@ -118,7 +125,7 @@ public class BSPJobClient extends Config
updateStatus();
return status.getRunState() == JobStatus.SUCCEEDED;
}
-
+
/**
* Blocks until the job is finished
*/
@@ -138,7 +145,7 @@ public class BSPJobClient extends Config
updateStatus();
return status.getRunState();
}
-
+
/**
* Tells the service to terminate the current job.
*/
@@ -149,21 +156,21 @@ public class BSPJobClient extends Config
@Override
public void killTask(TaskAttemptID taskId, boolean shouldFail)
throws IOException {
- jobSubmitClient.killTask(taskId, shouldFail);
- }
+ jobSubmitClient.killTask(taskId, shouldFail);
+ }
}
-
+
private JobSubmissionProtocol jobSubmitClient = null;
private Path sysDir = null;
private FileSystem fs = null;
-
+
// job files are world-wide readable and owner writable
- final private static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
+ final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+ .createImmutable((short) 0644); // rw-r--r--
// job submission directory is world readable/writable/executable
- final static FsPermission JOB_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
+ final static FsPermission JOB_DIR_PERMISSION = FsPermission
+ .createImmutable((short) 0777); // rwx-rwx-rwx
public BSPJobClient(Configuration conf) throws IOException {
setConf(conf);
@@ -171,27 +178,28 @@ public class BSPJobClient extends Config
}
public void init(Configuration conf) throws IOException {
- // it will be used to determine if the bspmaster is running on local or not.
+ // it will be used to determine if the bspmaster is running on local or not.
String master = conf.get("bsp.master.address", "local");
if ("local".equals(master)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
+ JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+ BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
}
}
/**
* Close the <code>JobClient</code>.
*/
- public synchronized void close() throws IOException {
- RPC.stopProxy(jobSubmitClient);
+ public synchronized void close() throws IOException {
+ RPC.stopProxy(jobSubmitClient);
}
-
+
/**
- * Get a filesystem handle. We need this to prepare jobs
- * for submission to the BSP system.
+ * Get a filesystem handle. We need this to prepare jobs for submission to the
+ * BSP system.
*
* @return the filesystem handle.
*/
@@ -202,22 +210,22 @@ public class BSPJobClient extends Config
}
return fs;
}
-
- private UnixUserGroupInformation getUGI(Configuration conf) throws IOException {
+
+ private UnixUserGroupInformation getUGI(Configuration conf)
+ throws IOException {
UnixUserGroupInformation ugi = null;
try {
ugi = UnixUserGroupInformation.login(conf, true);
} catch (LoginException e) {
- throw (IOException)(new IOException(
+ throw (IOException) (new IOException(
"Failed to get the current user's information.").initCause(e));
}
return ugi;
}
-
+
/**
- * Submit a job to the BSP system.
- * This returns a handle to the {@link RunningJob} which can be used to track
- * the running-job.
+ * Submit a job to the BSP system. This returns a handle to the
+ * {@link RunningJob} which can be used to track the running-job.
*
* @param job the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
@@ -226,25 +234,24 @@ public class BSPJobClient extends Config
* @throws IOException
*/
public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
- IOException {
- return submitJobInternal(job);
+ IOException {
+ return submitJobInternal(job);
}
-
- public
- RunningJob submitJobInternal(BSPJob job) throws IOException {
+
+ public RunningJob submitJobInternal(BSPJob job) throws IOException {
BSPJobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
- Path submitJarFile = new Path(submitJobDir, "job.jar");
+ Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitJobFile = new Path(submitJobDir, "job.xml");
-
+
LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir);
-
+
/*
* set this user's id in job configuration, so later job files can be
* accessed using this user's id
*/
UnixUserGroupInformation ugi = getUGI(job.getConf());
-
+
// Create a number of filenames in the BSPMaster's fs namespace
FileSystem fs = getFs();
fs.delete(submitJobDir, true);
@@ -253,25 +260,25 @@ public class BSPJobClient extends Config
FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION);
FileSystem.mkdirs(fs, submitJobDir, bspSysPerms);
fs.mkdirs(submitJobDir);
- short replication = (short)job.getInt("bsp.submit.replication", 10);
-
+ short replication = (short) job.getInt("bsp.submit.replication", 10);
+
String originalJarPath = job.getJar();
if (originalJarPath != null) { // copy jar to BSPMaster's fs
- // use jar name if job is not named.
- if ("".equals(job.getJobName())){
+ // use jar name if job is not named.
+ if ("".equals(job.getJobName())) {
job.setJobName(new Path(originalJarPath).getName());
}
job.setJar(submitJarFile.toString());
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
-
+
fs.setReplication(submitJarFile, replication);
fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
} else {
- LOG.warn("No job jar file set. User classes may not be found. "+
- "See BSPJob#setJar(String) or check Your jar file.");
+ LOG.warn("No job jar file set. User classes may not be found. "
+ + "See BSPJob#setJar(String) or check Your jar file.");
}
-
+
// Set the user's name and working directory
job.setUser(ugi.getUserName());
if (ugi.getGroupNames().length > 0) {
@@ -280,22 +287,22 @@ public class BSPJobClient extends Config
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(fs.getWorkingDirectory());
}
-
- // Write job file to BSPMaster's fs
- FSDataOutputStream out =
- FileSystem.create(fs, submitJobFile,
- new FsPermission(JOB_FILE_PERMISSION));
-
+
+ // Write job file to BSPMaster's fs
+ FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+
try {
job.writeXml(out);
} finally {
out.close();
}
-
+
//
// Now, actually submit the job (using the submit name)
//
- JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
+ JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile
+ .toString());
if (status != null) {
return new NetworkedJob(status);
} else {
@@ -304,7 +311,7 @@ public class BSPJobClient extends Config
}
/**
- * Monitor a job and print status in real-time as progress is made and tasks
+ * Monitor a job and print status in real-time as progress is made and tasks
* fail.
*
* @param job
@@ -313,29 +320,30 @@ public class BSPJobClient extends Config
* @throws IOException
* @throws InterruptedException
*/
- public boolean monitorAndPrintJob (BSPJob job, RunningJob info)
- throws IOException, InterruptedException {
-
+ public boolean monitorAndPrintJob(BSPJob job, RunningJob info)
+ throws IOException, InterruptedException {
+
String lastReport = null;
BSPJobID jobId = job.getJobID();
LOG.info("Running job: " + jobId);
-
+
while (!job.isComplete()) {
Thread.sleep(1000);
String report = " bsp " + StringUtils.formatPercent(job.progress(), 0);
-
+
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
- }
+ }
}
-
+
LOG.info("Job complete: " + jobId);
return job.isSuccessful();
}
-
+
/**
- * Grab the bspmaster system directory path where job-specific files are to be placed.
+ * Grab the bspmaster system directory path where job-specific files are to be
+ * placed.
*
* @return the system directory where job-specific files are to be placed.
*/
@@ -346,20 +354,33 @@ public class BSPJobClient extends Config
return sysDir;
}
- public static RunningJob runJob(BSPJob job) throws FileNotFoundException,
- IOException {
+ public static void runJob(BSPJob job) throws FileNotFoundException,
+ IOException {
BSPJobClient jc = new BSPJobClient(job.getConf());
- return jc.submitJobInternal(job);
+ RunningJob running = jc.submitJobInternal(job);
+ String jobId = running.getID().toString();
+ LOG.info("Running job: " + jobId);
+
+ while (true) {
+ if (running.isComplete()) {
+ LOG.info("Job complete: " + jobId);
+ break;
+ }
+ }
+
+ // TODO if error found, kill job
+ //running.killJob();
+ jc.close();
}
-
+
/**
* Get status information about the BSP cluster
*
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
- // TODO:
-
+ // TODO:
+
return null;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Oct 4 09:11:04 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -240,7 +241,7 @@ public class BSPMaster implements JobSub
String hamaMasterStr = conf.get("bsp.master.address", "localhost");
int defaultPort = conf.getInt("bsp.master.port", 40000);
- return NetUtils.createSocketAddr(hamaMasterStr, defaultPort);
+ return NetUtils.createSocketAddr(hamaMasterStr, defaultPort);
}
private static SimpleDateFormat getDateFormat() {
@@ -341,7 +342,7 @@ public class BSPMaster implements JobSub
@Override
public HeartbeatResponse heartbeat(GroomServerStatus status,
boolean restarted, boolean initialContact, boolean acceptNewTasks,
- short responseId) throws IOException {
+ short responseId, int reportSize) throws IOException {
// First check if the last heartbeat response got through
String groomName = status.getGroomName();
@@ -364,6 +365,8 @@ public class BSPMaster implements JobSub
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+ updateTaskStatuses(status);
+
// Check for new tasks to be executed on the groom server
if (acceptNewTasks) {
GroomServerStatus groomStatus = getGroomServer(groomName);
@@ -371,21 +374,21 @@ public class BSPMaster implements JobSub
LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
} else {
List<Task> taskList = taskScheduler.assignTasks(groomStatus);
- LOG.debug("BSPMaster.heartbeat.taskSize: " + taskList.size());
+
for (Task task : taskList) {
if (task != null) {
- actions.add(new LaunchTaskAction(task));
+
+ if (!jobs.get(task.getJobID()).getStatus().isJobComplete()) {
+ if (jobs.get(task.getJobID()).getStatus().getRunState() != JobStatus.RUNNING) {
+ actions.add(new LaunchTaskAction(task));
+ }
+ }
+
}
}
}
}
- // Check for tasks to be killed
- List<GroomServerAction> killTasksList = getTasksToKill(groomName);
- if (killTasksList != null) {
- actions.addAll(killTasksList);
- }
-
response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
groomToHeartbeatResponseMap.put(groomName, response);
@@ -394,6 +397,31 @@ public class BSPMaster implements JobSub
return response;
}
+ void updateTaskStatuses(GroomServerStatus status) {
+ for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
+ TaskStatus report = it.next();
+ report.setGroomServer(status.getGroomName());
+ String taskId = report.getTaskId();
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+
+ if (tip == null) {
+ LOG.info("Serious problem. While updating status, cannot find taskid "
+ + report.getTaskId());
+ } else {
+ JobInProgress job = tip.getJob();
+
+ if (report.getRunState() == TaskStatus.State.SUCCEEDED) {
+ job.completedTask(tip, report);
+ } else if (report.getRunState() == TaskStatus.State.FAILED) {
+ // Tell the job to fail the relevant task
+ } else {
+ job.updateTaskStatus(tip, report);
+ }
+ }
+
+ }
+ }
+
// (trackerID -> TreeSet of completed taskids running at that tracker)
TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
@@ -593,6 +621,7 @@ public class BSPMaster implements JobSub
private synchronized void killJob(JobInProgress job) {
LOG.info("Killing job " + job.getJobID());
+ job.kill();
}
@Override
@@ -626,21 +655,20 @@ public class BSPMaster implements JobSub
TaskInProgress taskInProgress) {
LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
+ ", for tracker '" + groomServer + "'");
- /*
+
// taskid --> tracker
- taskidToTrackerMap.put(taskid, taskTracker);
+ taskidToTrackerMap.put(taskid, groomServer);
// tracker --> taskid
- TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+ TreeSet taskset = (TreeSet) trackerToTaskMap.get(groomServer);
if (taskset == null) {
- taskset = new TreeSet();
- trackerToTaskMap.put(taskTracker, taskset);
+ taskset = new TreeSet();
+ trackerToTaskMap.put(groomServer, taskset);
}
taskset.add(taskid);
// taskid --> TIP
- taskidToTIPMap.put(taskid, tip);
- */
+ taskidToTIPMap.put(taskid, taskInProgress);
}
public static void main(String[] args) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Oct 4 09:11:04 2010
@@ -170,6 +170,10 @@ public class BSPPeer implements Watcher,
peer.put(messages.next());
}
}
+
+ // Should we clearing outgoingQueues?
+ this.outgoingQueues.clear();
+
enterBarrier();
Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
// because
@@ -228,7 +232,7 @@ public class BSPPeer implements Watcher,
@Override
public void close() throws IOException {
-
+ server.stop();
}
@Override
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Mon Oct 4 09:11:04 2010
@@ -7,7 +7,7 @@ public class BSPTask extends Task {
private BSP bsp;
private Configuration conf;
- public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
+ public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskid;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Oct 4 09:11:04 2010
@@ -82,10 +82,10 @@ public class GroomServer implements Runn
// Job
boolean acceptNewTasks = true;
private int failures;
- private int maxCurrentTasks;
+ private int maxCurrentTasks = 1;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
/** Map from taskId -> TaskInProgress. */
- Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+ Map<String, TaskInProgress> runningTasks = null;
Map<BSPJobID, RunningJob> runningJobs = null;
private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
@@ -120,7 +120,7 @@ public class GroomServer implements Runn
// Clear out state tables
this.tasks.clear();
this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
- this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
this.acceptNewTasks = true;
this.groomServerName = "groomd_" + localHostname;
@@ -221,6 +221,8 @@ public class GroomServer implements Runn
+ ((actions != null) ? actions.length : 0) + " actions");
if (actions != null) {
+ acceptNewTasks = false;
+
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
startNewTask((LaunchTaskAction) action);
@@ -264,9 +266,10 @@ public class GroomServer implements Runn
}
private void startNewTask(LaunchTaskAction action) {
- TaskInProgress tip = new TaskInProgress(action.getTask());
+ TaskInProgress tip = new TaskInProgress(action.getTask(), this.groomServerName);
synchronized (tip) {
try {
+ runningTasks.put(action.getTask().getTaskID(), tip);
tip.launchTask();
} catch (Throwable ie) {
// TODO: when job failed.
@@ -293,7 +296,21 @@ public class GroomServer implements Runn
// TODO - Later, acceptNewTask is to be set by the status of groom server.
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
- justStarted, justInited, acceptNewTasks, heartbeatResponseId);
+ justStarted, justInited, acceptNewTasks, heartbeatResponseId, status.getTaskReports().size());
+
+
+ synchronized (this) {
+ for (TaskStatus taskStatus : status.getTaskReports()) {
+ if(taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ LOG.debug("Removing task from runningTasks: " + taskStatus.getTaskId());
+ runningTasks.remove(taskStatus.getTaskId());
+ }
+ }
+ }
+
+ // Force a rebuild of 'status' on the next iteration
+ status = null;
+
return heartbeatResponse;
}
@@ -387,18 +404,19 @@ public class GroomServer implements Runn
volatile boolean wasKilled = false;
private TaskStatus taskStatus;
- public TaskInProgress(Task task) {
+ public TaskInProgress(Task task, String groomServer) {
this.task = task;
+ this.taskStatus = new TaskStatus(task.getTaskID(), 0, TaskStatus.State.UNASSIGNED, "running", groomServer, TaskStatus.Phase.STARTING);
}
static final String SUBDIR = "groomServer";
public void launchTask() {
- // until job is completed, don't accept new task
- acceptNewTasks = false;
-
+ taskStatus.setRunState(TaskStatus.State.RUNNING);
+
try {
// TODO: need to move this code to TaskRunner
+
task.getJobFile();
conf.addResource(task.getJobFile());
BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
@@ -408,14 +426,12 @@ public class GroomServer implements Runn
Path localJarFile =
defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.jar");
- LOG.debug("localJobFile: "+ localJobFile);
-
systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", ".jar")), localJarFile);
HamaConfiguration conf = new HamaConfiguration();
conf.addResource(localJobFile);
- BSPJob jobConf = new BSPJob(conf, task.getJobID());
+ BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
jobConf.setJar(localJarFile.toString());
BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getBspClass(), conf);
@@ -425,9 +441,24 @@ public class GroomServer implements Runn
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
+ } finally {
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // If local/outgoing queues are empty, task is done.
+ if(bspPeer.localQueue.size() == 0 && bspPeer.outgoingQueues.size() == 0) {
+ taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ acceptNewTasks = true;
+ break;
+ }
+ }
}
- // TODO: report the task status
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Mon Oct 4 09:11:04 2010
@@ -18,11 +18,14 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
/*************************************************************
@@ -51,6 +54,7 @@ class JobInProgress {
Path jobFile = null;
Path localJobFile = null;
Path localJarFile = null;
+ private LocalFileSystem localFs;
long startTime;
long launchTime;
@@ -58,14 +62,17 @@ class JobInProgress {
// private LocalFileSystem localFs;
private BSPJobID jobId;
-
final BSPMaster master;
+ List<TaskInProgress> tasks;
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
this.conf = conf;
this.jobId = jobId;
+ this.tasks = new ArrayList<TaskInProgress>();
+ this.localFs = (LocalFileSystem) FileSystem.getNamed("local", conf);
+
this.master = master;
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
this.startTime = System.currentTimeMillis();
@@ -76,10 +83,7 @@ class JobInProgress {
+ ".xml");
this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ ".jar");
-
- LOG.debug("JobInProgress.localJobFile: " + this.localJobFile);
- LOG.debug("JobInProgress.localJarFile: " + this.localJarFile);
-
+
Path jobDir = master.getSystemDirectoryForJob(jobId);
FileSystem fs = jobDir.getFileSystem(conf);
jobFile = new Path(jobDir, "job.xml");
@@ -93,6 +97,7 @@ class JobInProgress {
if (jarFile != null) {
fs.copyToLocalFile(new Path(jarFile), localJarFile);
}
+
}
// ///////////////////////////////////////////////////
@@ -136,14 +141,83 @@ class JobInProgress {
// ///////////////////////////////////////////////////
public synchronized Task obtainNewTask(GroomServerStatus status,
int clusterSize, int numUniqueHosts) {
+ LOG.debug("clusterSize: " + clusterSize);
+
Task result = null;
try {
- result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
- numUniqueHosts).getTaskToRun(status);
+ TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile
+ .toString(), this.master, this.conf, this, numUniqueHosts);
+ tasks.add(tip);
+ result = tip.getTaskToRun(status);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
+
+ public void completedTask(TaskInProgress tip, TaskStatus status) {
+ String taskid = status.getTaskId();
+ updateTaskStatus(tip, status);
+ LOG.info("Taskid '" + taskid + "' has finished successfully.");
+ tip.completed(taskid);
+
+ //
+ // If all tasks are complete, then the job is done!
+ //
+
+ boolean allDone = true;
+ for (TaskInProgress taskInProgress : tasks) {
+ if (!taskInProgress.isComplete()) {
+ allDone = false;
+ break;
+ }
+ }
+
+ this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
+ JobStatus.RUNNING);
+
+ if(allDone) {
+ LOG.debug("Job successfully done.");
+
+ this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
+ JobStatus.SUCCEEDED);
+ garbageCollect();
+ }
+ }
+
+ public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
+ tip.updateStatus(status); // update tip
+ }
+
+ public void kill() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * The job is dead. We're now GC'ing it, getting rid of the job from all
+ * tables. Be sure to remove all of this job's tasks from the various tables.
+ */
+ synchronized void garbageCollect() {
+ try {
+ // Definitely remove the local-disk copy of the job file
+ if (localJobFile != null) {
+ localFs.delete(localJobFile, true);
+ localJobFile = null;
+ }
+ if (localJarFile != null) {
+ localFs.delete(localJarFile, true);
+ localJarFile = null;
+ }
+
+ // JobClient always creates a new directory with job files
+ // so we remove that directory to cleanup
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(profile.getJobFile()).getParent(), true);
+
+ } catch (IOException e) {
+ LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
+ }
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Oct 4 09:11:04 2010
@@ -155,7 +155,7 @@ public class LocalJobRunner implements J
try {
GroomServer servers = new GroomServer(conf);
- Task task = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+ Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
servers.assignTask(task);
LOG.info("Adding task '" + tID.toString() + "' for '" + servers.getServerName() + "'");
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Oct 4 09:11:04 2010
@@ -39,7 +39,7 @@ class SimpleTaskScheduler extends TaskSc
+ (jobQueue.size() + 1) + ")");
jobQueue.add(job);
}
-
+
// removes job
public void removeJob(JobInProgress job) {
jobQueue.remove(job);
@@ -77,23 +77,26 @@ class SimpleTaskScheduler extends TaskSc
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
- /*
- * if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue;
- * }
- */
-
- Task t = null;
-
- t = job.obtainNewTask(groomStatus, numGroomServers,
- groomServerManager.getNumberOfUniqueHosts());
-
- if (t != null) {
- assignedTasks.add(t);
- break; // TODO - Now, simple scheduler assigns only one task to
- // each groom. Later, it will be improved for scheduler to
- // assign one or more tasks to each groom according to
- // its capacity.
+ if (!job.getStatus().isJobComplete() && job.getStatus().getRunState() != JobStatus.RUNNING) {
+ /*
+ * if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+ * continue; }
+ */
+
+ Task t = null;
+
+ t = job.obtainNewTask(groomStatus, numGroomServers,
+ groomServerManager.getNumberOfUniqueHosts());
+
+ if (t != null) {
+ assignedTasks.add(t);
+ break; // TODO - Now, simple scheduler assigns only one task to
+ // each groom. Later, it will be improved for scheduler to
+ // assign one or more tasks to each groom according to
+ // its capacity.
+ }
}
+
}
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Mon Oct 4 09:11:04 2010
@@ -36,7 +36,7 @@ public class Task implements Writable {
// Fields
////////////////////////////////////////////
- protected String jobId;
+ protected BSPJobID jobId;
protected String jobFile;
protected String taskId;
protected int partition;
@@ -44,10 +44,11 @@ public class Task implements Writable {
protected LocalDirAllocator lDirAlloc;
public Task() {
+ jobId = new BSPJobID();
taskId = new String();
}
- public Task(String jobId, String jobFile, String taskId, int partition) {
+ public Task(BSPJobID jobId, String jobFile, String taskId, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
@@ -73,7 +74,7 @@ public class Task implements Writable {
* Get the job name for this task.
* @return the job name
*/
- public String getJobID() {
+ public BSPJobID getJobID() {
return jobId;
}
@@ -95,7 +96,7 @@ public class Task implements Writable {
////////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
- Text.writeString(out, jobId);
+ jobId.write(out);
Text.writeString(out, jobFile);
Text.writeString(out, taskId);
out.writeInt(partition);
@@ -103,7 +104,7 @@ public class Task implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
- jobId = Text.readString(in);
+ jobId.readFields(in);
jobFile = Text.readString(in);
taskId = Text.readString(in);
partition = in.readInt();
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Mon Oct 4 09:11:04 2010
@@ -68,7 +68,7 @@ class TaskInProgress {
/**
* Map from taskId -> TaskStatus
*/
- private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+ private TreeMap<String, TaskStatus> taskStatuses = new TreeMap<String, TaskStatus>();
private BSPJobID jobId;
@@ -92,7 +92,7 @@ class TaskInProgress {
String taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
- taskid = new String("task_" + nextTaskId);
+ taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
++nextTaskId;
} else {
LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
@@ -100,8 +100,7 @@ class TaskInProgress {
return null;
}
- //this.conf.set(Constants.PEER_PORT, String.valueOf(30000));
- t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, this.conf);
+ t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
activeTasks.put(taskid, status.getGroomName());
// Ask JobTracker to note that the task exists
@@ -127,9 +126,13 @@ class TaskInProgress {
}
public TaskID getTIPId() {
- return this.id;
+ return id;
}
+ public TreeMap<String, String> getTasks() {
+ return activeTasks;
+ }
+
/**
* Is the Task associated with taskid is the first attempt of the tip?
*
@@ -171,4 +174,23 @@ class TaskInProgress {
return false;
}
}
+
+ public void completed(String taskid) {
+ LOG.info("Task '" + taskid + "' has completed.");
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ status.setRunState(TaskStatus.State.SUCCEEDED);
+ activeTasks.remove(taskid);
+
+ //
+ // Now that the TIP is complete, the other speculative
+ // subtasks will be closed when the owning tasktracker
+ // reports in and calls shouldClose() on this object.
+ //
+
+ this.completes++;
+ }
+
+ public void updateStatus(TaskStatus status) {
+ taskStatuses.put(status.getTaskId(), status);
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Mon Oct 4 09:11:04 2010
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-class TaskStatus implements Writable {
+class TaskStatus implements Writable, Cloneable {
static final Log LOG = LogFactory.getLog(TaskStatus.class);
// enumeration for reporting current phase of a task.
@@ -40,7 +40,7 @@ class TaskStatus implements Writable {
RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
}
- private final TaskAttemptID taskId;
+ private String taskId;
private float progress;
private volatile State runState;
private String stateString;
@@ -55,10 +55,10 @@ class TaskStatus implements Writable {
*
*/
public TaskStatus() {
- taskId = new TaskAttemptID();
+ taskId = new String();
}
- public TaskStatus(TaskAttemptID taskId, float progress, State runState,
+ public TaskStatus(String taskId, float progress, State runState,
String stateString, String groomServer, Phase phase) {
this.taskId = taskId;
this.progress = progress;
@@ -72,7 +72,7 @@ class TaskStatus implements Writable {
// Accessors and Modifiers
// //////////////////////////////////////////////////
- public TaskAttemptID getTaskId() {
+ public String getTaskId() {
return taskId;
}
@@ -225,7 +225,7 @@ class TaskStatus implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
- this.taskId.readFields(in);
+ this.taskId = Text.readString(in);
this.progress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.stateString = Text.readString(in);
@@ -236,7 +236,7 @@ class TaskStatus implements Writable {
@Override
public void write(DataOutput out) throws IOException {
- taskId.write(out);
+ Text.writeString(out, taskId);
out.writeFloat(progress);
WritableUtils.writeEnum(out, runState);
Text.writeString(out, stateString);
Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java?rev=1004154&r1=1004153&r2=1004154&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java Mon Oct 4 09:11:04 2010
@@ -25,7 +25,7 @@ import org.apache.hama.bsp.HeartbeatResp
public interface InterTrackerProtocol extends HamaRPCProtocolVersion {
public HeartbeatResponse heartbeat(GroomServerStatus status,
boolean restarted, boolean initialContact, boolean acceptNewTasks,
- short responseId) throws IOException;
+ short responseId, int reportSize) throws IOException;
public String getSystemDir();
}