You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/19 04:43:26 UTC

svn commit: r1024096 - in /incubator/hama/trunk: CHANGES.txt src/java/org/apache/hama/bsp/GroomServer.java

Author: edwardyoon
Date: Tue Oct 19 02:43:25 2010
New Revision: 1024096

URL: http://svn.apache.org/viewvc?rev=1024096&view=rev
Log:
Uncomment code in checkLocalDirs()

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024096&r1=1024095&r2=1024096&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 19 02:43:25 2010
@@ -171,6 +171,7 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-315: Uncomment code in checkLocalDirs() (edwardyoon)
     HAMA-314: Remove unnecessary methods from HamaConfiguration (edwardyoon)
     HAMA-307: BSPMaster - job ID counter is not read and updated atomically
                       (Filipe Manana via edward)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024096&r1=1024095&r2=1024096&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Oct 19 02:43:25 2010
@@ -92,12 +92,12 @@ public class GroomServer implements Runn
   public GroomServer(Configuration conf) throws IOException {
     LOG.info("groom start");
     this.conf = conf;
-    
+
     String mode = conf.get("bsp.master.address");
     if (!mode.equals("local")) {
       bspMasterAddr = BSPMaster.getAddress(conf);
     }
-    
+
     // FileSystem local = FileSystem.getLocal(conf);
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
   }
@@ -154,8 +154,8 @@ public class GroomServer implements Runn
       }
     }
 
-    // if (!writable)
-    // throw new DiskErrorException("all local directories are not writable");
+    if (!writable)
+      throw new DiskErrorException("all local directories are not writable");
   }
 
   public String[] getLocalDirs() {
@@ -229,7 +229,7 @@ public class GroomServer implements Runn
 
         if (actions != null) {
           acceptNewTasks = false;
-          
+
           for (GroomServerAction action : actions) {
             if (action instanceof LaunchTaskAction) {
               startNewTask((LaunchTaskAction) action);
@@ -273,7 +273,8 @@ public class GroomServer implements Runn
   }
 
   private void startNewTask(LaunchTaskAction action) {
-    TaskInProgress tip = new TaskInProgress(action.getTask(), this.groomServerName);
+    TaskInProgress tip = new TaskInProgress(action.getTask(),
+        this.groomServerName);
 
     synchronized (this) {
       runningTasks.put(action.getTask().getTaskID(), tip);
@@ -304,21 +305,22 @@ public class GroomServer implements Runn
 
     // TODO - Later, acceptNewTask is to be set by the status of groom server.
     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
-        justStarted, justInited, acceptNewTasks, heartbeatResponseId, status.getTaskReports().size());
-      
-    
+        justStarted, justInited, acceptNewTasks, heartbeatResponseId, status
+            .getTaskReports().size());
+
     synchronized (this) {
       for (TaskStatus taskStatus : status.getTaskReports()) {
-        if(taskStatus.getRunState() != TaskStatus.State.RUNNING) {
-          LOG.debug("Removing task from runningTasks: " + taskStatus.getTaskId());
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+          LOG.debug("Removing task from runningTasks: "
+              + taskStatus.getTaskId());
           runningTasks.remove(taskStatus.getTaskId());
         }
       }
     }
-      
+
     // Force a rebuild of 'status' on the next iteration
     status = null;
-    
+
     return heartbeatResponse;
   }
 
@@ -414,35 +416,39 @@ public class GroomServer implements Runn
 
     public TaskInProgress(Task task, String groomServer) {
       this.task = task;
-      this.taskStatus = new TaskStatus(task.getTaskID(), 0, TaskStatus.State.UNASSIGNED, "running", groomServer, TaskStatus.Phase.STARTING);
+      this.taskStatus = new TaskStatus(task.getTaskID(), 0,
+          TaskStatus.State.UNASSIGNED, "running", groomServer,
+          TaskStatus.Phase.STARTING);
     }
 
     static final String SUBDIR = "groomServer";
-    
+
     public void launchTask() {
       taskStatus.setRunState(TaskStatus.State.RUNNING);
-      
+
       try {
         // TODO: need to move this code to TaskRunner
-        
+
         task.getJobFile();
         conf.addResource(task.getJobFile());
         BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
 
-        Path localJobFile =
-          defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.xml");
-        Path localJarFile =
-          defaultJobConf.getLocalPath(SUBDIR+"/"+task.getTaskID()+"/"+"job.jar");
-        
+        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+            + task.getTaskID() + "/" + "job.xml");
+        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+            + task.getTaskID() + "/" + "job.jar");
+
         systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
-        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", ".jar")), localJarFile);
+        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
+            ".jar")), localJarFile);
 
         HamaConfiguration conf = new HamaConfiguration();
         conf.addResource(localJobFile);
         BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
         jobConf.setJar(localJarFile.toString());
-        
-        BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getBspClass(), conf);
+
+        BSP bsp = (BSP) ReflectionUtils
+            .newInstance(jobConf.getBspClass(), conf);
         bsp.setPeer(bspPeer);
         try {
           bsp.runBSP();
@@ -450,21 +456,22 @@ public class GroomServer implements Runn
           e.printStackTrace();
           taskStatus.setRunState(TaskStatus.State.FAILED);
         }
-        
+
       } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
       } finally {
-        
+
         while (true) {
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e) {
             e.printStackTrace();
           }
-          
+
           // If local/outgoing queues are empty, task is done.
-          if(bspPeer.localQueue.size() == 0 && bspPeer.outgoingQueues.size() == 0) {
+          if (bspPeer.localQueue.size() == 0
+              && bspPeer.outgoingQueues.size() == 0) {
             taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
             acceptNewTasks = true;
             break;