You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/19 04:43:26 UTC
svn commit: r1024096 - in /incubator/hama/trunk: CHANGES.txt
src/java/org/apache/hama/bsp/GroomServer.java
Author: edwardyoon
Date: Tue Oct 19 02:43:25 2010
New Revision: 1024096
URL: http://svn.apache.org/viewvc?rev=1024096&view=rev
Log:
Uncomment code in checkLocalDirs()
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024096&r1=1024095&r2=1024096&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 19 02:43:25 2010
@@ -171,6 +171,7 @@ Trunk (unreleased changes)
BUG FIXES
+ HAMA-315: Uncomment code in checkLocalDirs() (edwardyoon)
HAMA-314: Remove unnecessary methods from HamaConfiguration (edwardyoon)
HAMA-307: BSPMaster - job ID counter is not read and updated atomically
(Filipe Manana via edward)
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024096&r1=1024095&r2=1024096&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Oct 19 02:43:25 2010
@@ -92,12 +92,12 @@ public class GroomServer implements Runn
public GroomServer(Configuration conf) throws IOException {
LOG.info("groom start");
this.conf = conf;
-
+
String mode = conf.get("bsp.master.address");
if (!mode.equals("local")) {
bspMasterAddr = BSPMaster.getAddress(conf);
}
-
+
// FileSystem local = FileSystem.getLocal(conf);
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
}
@@ -154,8 +154,8 @@ public class GroomServer implements Runn
}
}
- // if (!writable)
- // throw new DiskErrorException("all local directories are not writable");
+ if (!writable)
+ throw new DiskErrorException("all local directories are not writable");
}
public String[] getLocalDirs() {
@@ -229,7 +229,7 @@ public class GroomServer implements Runn
if (actions != null) {
acceptNewTasks = false;
-
+
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
startNewTask((LaunchTaskAction) action);
@@ -273,7 +273,8 @@ public class GroomServer implements Runn
}
private void startNewTask(LaunchTaskAction action) {
- TaskInProgress tip = new TaskInProgress(action.getTask(), this.groomServerName);
+ TaskInProgress tip = new TaskInProgress(action.getTask(),
+ this.groomServerName);
synchronized (this) {
runningTasks.put(action.getTask().getTaskID(), tip);
@@ -304,21 +305,22 @@ public class GroomServer implements Runn
// TODO - Later, acceptNewTask is to be set by the status of groom server.
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
- justStarted, justInited, acceptNewTasks, heartbeatResponseId, status.getTaskReports().size());
-
-
+ justStarted, justInited, acceptNewTasks, heartbeatResponseId, status
+ .getTaskReports().size());
+
synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
- if(taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- LOG.debug("Removing task from runningTasks: " + taskStatus.getTaskId());
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ LOG.debug("Removing task from runningTasks: "
+ + taskStatus.getTaskId());
runningTasks.remove(taskStatus.getTaskId());
}
}
}
-
+
// Force a rebuild of 'status' on the next iteration
status = null;
-
+
return heartbeatResponse;
}
@@ -414,35 +416,39 @@ public class GroomServer implements Runn
public TaskInProgress(Task task, String groomServer) {
this.task = task;
- this.taskStatus = new TaskStatus(task.getTaskID(), 0, TaskStatus.State.UNASSIGNED, "running", groomServer, TaskStatus.Phase.STARTING);
+ this.taskStatus = new TaskStatus(task.getTaskID(), 0,
+ TaskStatus.State.UNASSIGNED, "running", groomServer,
+ TaskStatus.Phase.STARTING);
}
static final String SUBDIR = "groomServer";
-
+
public void launchTask() {
taskStatus.setRunState(TaskStatus.State.RUNNING);
-
+
try {
// TODO: need to move this code to TaskRunner
-
+
task.getJobFile();
conf.addResource(task.getJobFile());
BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
- Path localJobFile =
- defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.xml");
- Path localJarFile =
- defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.jar");
-
+ Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.xml");
+ Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.jar");
+
systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
- systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", ".jar")), localJarFile);
+ systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
+ ".jar")), localJarFile);
HamaConfiguration conf = new HamaConfiguration();
conf.addResource(localJobFile);
BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
jobConf.setJar(localJarFile.toString());
-
- BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getBspClass(), conf);
+
+ BSP bsp = (BSP) ReflectionUtils
+ .newInstance(jobConf.getBspClass(), conf);
bsp.setPeer(bspPeer);
try {
bsp.runBSP();
@@ -450,21 +456,22 @@ public class GroomServer implements Runn
e.printStackTrace();
taskStatus.setRunState(TaskStatus.State.FAILED);
}
-
+
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
-
+
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
-
+
// If local/outgoing queues are empty, task is done.
- if(bspPeer.localQueue.size() == 0 && bspPeer.outgoingQueues.size() == 0) {
+ if (bspPeer.localQueue.size() == 0
+ && bspPeer.outgoingQueues.size() == 0) {
taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
acceptNewTasks = true;
break;