You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/11/11 09:44:57 UTC

svn commit: r1033838 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Thu Nov 11 08:44:57 2010
New Revision: 1033838

URL: http://svn.apache.org/viewvc?rev=1033838&view=rev
Log:
BSP program doesn't work when the desired tasks smaller than cluster size

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Nov 11 08:44:57 2010
@@ -181,6 +181,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-330: BSP program doesn't work when the desired tasks smaller than 
+                       cluster size (edwardyoon)
     HAMA-326: GroomServer should wait until a job finishes (edwardyoon)
     HAMA-324: Removing javadoc warnings and useless libraries (edwardyoon)
     HAMA-322: Make sure failed assertions on test threads are reported

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Thu Nov 11 08:44:57 2010
@@ -99,7 +99,7 @@ public class PiEstimator {
     // Set the job name
     bsp.setJobName("pi estimation example");
     bsp.setBspClass(MyEstimator.class);
-    bsp.setNumBspTask(10);
+    bsp.setNumBspTask(1);
     
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Thu Nov 11 08:44:57 2010
@@ -27,6 +27,7 @@ import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
 import org.apache.zookeeper.KeeperException;
 
 public class SerializePrinting {
@@ -44,7 +45,7 @@ public class SerializePrinting {
       int i = 0;
       for (String otherPeer : bspPeer.getAllPeerNames()) {
         if (bspPeer.getPeerName().equals(otherPeer)) {
-          LOG.info("Hello BSP from " + i + " of " + num + ": "
+          LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
               + bspPeer.getPeerName());
         }
         
@@ -75,7 +76,11 @@ public class SerializePrinting {
     // Set the job name
     bsp.setJobName("serialize printing");
     bsp.setBspClass(HelloBSP.class);
-    bsp.setNumBspTask(10);
+    
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    bsp.setNumBspTask(cluster.getGroomServers());
     
     BSPJobClient.runJob(bsp);
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Nov 11 08:44:57 2010
@@ -381,13 +381,7 @@ public class BSPMaster implements JobSub
 
         for (Task task : taskList) {
           if (task != null) {
-
-            if (!jobs.get(task.getJobID()).getStatus().isJobComplete()) {
-              if (jobs.get(task.getJobID()).getStatus().getRunState() != JobStatus.RUNNING) {
                 actions.add(new LaunchTaskAction(task));
-              }
-            }
-
           }
         }
       }
@@ -463,7 +457,6 @@ public class BSPMaster implements JobSub
     LOG.debug("Removing task '" + taskid + "'");
   }
 
-  /*
   private List<GroomServerAction> getTasksToKill(String groomName) {
     Set<String> taskIds = (TreeSet<String>) trackerToTaskMap.get(groomName);
     if (taskIds != null) {
@@ -498,7 +491,6 @@ public class BSPMaster implements JobSub
     return null;
 
   }
-  */
   
   /**
    * Process incoming heartbeat messages from the groom.
@@ -584,6 +576,16 @@ public class BSPMaster implements JobSub
       jobs.put(job.getProfile().getJobID(), job);
       taskScheduler.addJob(job);
     }
+    
+    
+    // TODO Later, we should use the JobInProgressListener -- edwardyoon
+    try {
+      job.initTasks();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    
     return job.getStatus();
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Thu Nov 11 08:44:57 2010
@@ -46,6 +46,7 @@ class JobInProgress {
   }
 
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
+  boolean tasksInited = false;
 
   Configuration conf;
   JobProfile profile;
@@ -62,15 +63,16 @@ class JobInProgress {
   // private LocalFileSystem localFs;
   private BSPJobID jobId;
   final BSPMaster master;
-  List<TaskInProgress> tasks;
+  TaskInProgress tasks[] = new TaskInProgress[0];
   private long superstepCounter;
 
+  int numBSPTasks = 0;
+  int clusterSize;
+
   public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
       throws IOException {
     this.conf = conf;
     this.jobId = jobId;
-
-    this.tasks = new ArrayList<TaskInProgress>();
     this.localFs = FileSystem.getLocal(conf);
 
     this.master = master;
@@ -88,7 +90,8 @@ class JobInProgress {
     FileSystem fs = jobDir.getFileSystem(conf);
     jobFile = new Path(jobDir, "job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
-    BSPJobContext job = new BSPJobContext(localJobFile, jobId);
+    BSPJob job = new BSPJob(jobId, localJobFile.toString());
+    this.numBSPTasks = job.getNumBspTask();
 
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
         .getJobName());
@@ -100,9 +103,6 @@ class JobInProgress {
 
   }
 
-  // ///////////////////////////////////////////////////
-  // Accessors for the JobInProgress
-  // ///////////////////////////////////////////////////
   public JobProfile getProfile() {
     return profile;
   }
@@ -124,6 +124,13 @@ class JobInProgress {
   }
 
   /**
+   * @return the number of desired tasks.
+   */
+  public int desiredBSPTasks() {
+    return numBSPTasks;
+  }
+
+  /**
    * @return The JobID of this JobInProgress.
    */
   public BSPJobID getJobID() {
@@ -139,16 +146,45 @@ class JobInProgress {
   // ///////////////////////////////////////////////////
   // Create/manage tasks
   // ///////////////////////////////////////////////////
+
+  public synchronized void initTasks() throws IOException {
+    if (tasksInited) {
+      return;
+    }
+
+    // adjust number of map tasks to actual number of splits
+    this.tasks = new TaskInProgress[numBSPTasks];
+    for (int i = 0; i < numBSPTasks; i++) {
+      tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+          this.master, this.conf, this, i);
+    }
+
+    // Update job status
+    this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
+       JobStatus.RUNNING);
+
+    tasksInited = true;
+    LOG.debug("Job is initialized.");
+  }
+
   public synchronized Task obtainNewTask(GroomServerStatus status,
       int clusterSize, int numUniqueHosts) {
-    LOG.debug("clusterSize: " + clusterSize);
-
+    this.clusterSize = clusterSize;
+    
+    if (this.status.getRunState() != JobStatus.RUNNING) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+    
     Task result = null;
     try {
-      TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile
-          .toString(), this.master, this.conf, this, numUniqueHosts);
-      tasks.add(tip);
-      result = tip.getTaskToRun(status);
+      for (int i = 0; i < tasks.length; i++) {
+        if(!tasks[i].isRunning()) {
+          result = tasks[i].getTaskToRun(status);
+          break;
+        }
+      }
+      
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -156,7 +192,7 @@ class JobInProgress {
     return result;
   }
 
-  public void completedTask(TaskInProgress tip, TaskStatus status) {
+  public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
     String taskid = status.getTaskId();
     updateTaskStatus(tip, status);
     LOG.info("Taskid '" + taskid + "' has finished successfully.");
@@ -174,9 +210,6 @@ class JobInProgress {
       }
     }
 
-    this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
-        JobStatus.RUNNING);
-
     if (allDone) {
       LOG.debug("Job successfully done.");
 
@@ -186,13 +219,14 @@ class JobInProgress {
     }
   }
 
-  public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus taskStatus) {
+  public synchronized void updateTaskStatus(TaskInProgress tip,
+      TaskStatus taskStatus) {
     tip.updateStatus(taskStatus); // update tip
 
     if (superstepCounter < taskStatus.getSuperstepCount()) {
       superstepCounter = taskStatus.getSuperstepCount();
       // TODO Later, we have to update JobInProgress status here
-      
+
     }
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java Thu Nov 11 08:44:57 2010
@@ -26,10 +26,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
-/**************************************************
+/*
  * A JobProfile tracks job's status
- * 
- **************************************************/
+ */
 public class JobProfile implements Writable {
 
   static { // register a ctor
@@ -96,9 +95,6 @@ public class JobProfile implements Writa
     return name;
   }
 
-  // /////////////////////////////////////
-  // Writable
-  // /////////////////////////////////////
   public void write(DataOutput out) throws IOException {
     jobid.write(out);
     Text.writeString(out, jobFile);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1033838&r1=1033837&r2=1033838&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Nov 11 08:44:57 2010
@@ -76,28 +76,24 @@ class SimpleTaskScheduler extends TaskSc
       // instance to the scheduler.
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
+          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
 
-          if (!job.getStatus().isJobComplete() && job.getStatus().getRunState() != JobStatus.RUNNING) {
-            /*
-             * if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-             * continue; }
-             */
-
-            Task t = null;
+          Task t = null;
 
-            t = job.obtainNewTask(groomStatus, numGroomServers,
-                groomServerManager.getNumberOfUniqueHosts());
+          t = job.obtainNewTask(groomStatus, numGroomServers,
+              groomServerManager.getNumberOfUniqueHosts());
 
-            if (t != null) {
-              assignedTasks.add(t);
-              break; // TODO - Now, simple scheduler assigns only one task to
-              // each groom. Later, it will be improved for scheduler to
-              // assign one or more tasks to each groom according to
-              // its capacity.
-            }
+          if (t != null) {
+            assignedTasks.add(t);
+            break; // TODO - Now, simple scheduler assigns only one task to
+            // each groom. Later, it will be improved for scheduler to
+            // assign one or more tasks to each groom according to
+            // its capacity.
           }
-          
         }
+
       }
     }