You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/12/15 11:51:52 UTC

svn commit: r1049495 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Wed Dec 15 10:51:52 2010
New Revision: 1049495

URL: http://svn.apache.org/viewvc?rev=1049495&view=rev
Log:
Implementation of job submit command

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Dec 15 10:51:52 2010
@@ -50,7 +50,8 @@ Trunk (unreleased changes)
     HAMA-2: The intial donation of Hama from the google project (edwardyoon)
 
   IMPROVEMENTS
-    
+ 
+    HAMA-340: Implementation of job submit command (edwardyoon)   
     HAMA-278: Few minor refactoring (edwardyoon)
     HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID
                        (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java Wed Dec 15 10:51:52 2010
@@ -22,6 +22,7 @@ package org.apache.hama;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Adds Hama configuration files to a Configuration
@@ -33,6 +34,11 @@ public class HamaConfiguration extends C
     addHamaResources();
   }
 
+  public HamaConfiguration(Path confFile) {
+    super();
+    this.addResource(confFile);
+  }
+  
   /**
    * Create a clone of passed configuration.
    * 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Wed Dec 15 10:51:52 2010
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Random;
 
 import javax.security.auth.login.LoginException;
 
@@ -264,9 +265,13 @@ public class BSPJobClient extends Config
     return submitJobInternal(job);
   }
 
+  static Random r = new Random();
+
   public RunningJob submitJobInternal(BSPJob job) throws IOException {
     BSPJobID jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
+
+    Path submitJobDir = new Path(getSystemDir(), "submit_"
+        + Integer.toString(Math.abs(r.nextInt()), 36));
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitJobFile = new Path(submitJobDir, "job.xml");
 
@@ -462,6 +467,8 @@ public class BSPJobClient extends Config
     boolean listAllJobs = false;
     boolean listActiveGrooms = false;
     boolean killJob = false;
+    boolean submitJob = false;
+    String submitJobFile = null;
     String jobid = null;
 
     HamaConfiguration conf = new HamaConfiguration(getConf());
@@ -483,6 +490,14 @@ public class BSPJobClient extends Config
         return exitCode;
       }
       listActiveGrooms = true;
+    } else if ("-submit".equals(cmd)) {
+      if (args.length == 1) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+
+      submitJob = true;
+      submitJobFile = args[1];
     } else if ("-kill".equals(cmd)) {
       if (args.length == 1) {
         displayUsage(cmd);
@@ -502,6 +517,10 @@ public class BSPJobClient extends Config
     } else if (listActiveGrooms) {
       listActiveGrooms();
       exitCode = 0;
+    } else if (submitJob) {
+      HamaConfiguration tConf = new HamaConfiguration(new Path(submitJobFile));
+      RunningJob job = jc.submitJob(new BSPJob(tConf));
+      System.out.println("Created job " + job.getID().toString());
     } else if (killJob) {
       RunningJob job = jc.getJob(new BSPJobID().forName(jobid));
       if (job == null) {
@@ -533,8 +552,7 @@ public class BSPJobClient extends Config
     } else if ("-list-active-grooms".equals(cmd)) {
       System.err.println(prefix + "[" + cmd + "]");
     } else if ("-list-attempt-ids".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd
-          + " <job-id> <task-state>]. "
+      System.err.println(prefix + "[" + cmd + " <job-id> <task-state>]. "
           + "Valid values for <task-state> are " + taskStates);
     } else {
       System.err.printf(prefix + "<command> <args>\n");
@@ -543,8 +561,7 @@ public class BSPJobClient extends Config
       System.err.printf("\t[-kill <job-id>]\n");
       System.err.printf("\t[-list [all]]\n");
       System.err.printf("\t[-list-active-grooms]\n");
-      System.err.println("\t[-list-attempt-ids <job-id> "
-          + "<task-state>]\n");
+      System.err.println("\t[-list-attempt-ids <job-id> " + "<task-state>]\n");
       System.err.printf("\t[-kill-task <task-id>]\n");
       System.err.printf("\t[-fail-task <task-id>]\n\n");
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Wed Dec 15 10:51:52 2010
@@ -535,7 +535,7 @@ public class BSPMaster implements JobSub
       return jobs.get(jobID).getStatus();
     }
 
-    JobInProgress job = new JobInProgress(jobID, this, this.conf);
+    JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this, this.conf);
     return addJob(jobID, job);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Wed Dec 15 10:51:52 2010
@@ -69,12 +69,12 @@ class JobInProgress {
   int numBSPTasks = 0;
   int clusterSize;
 
-  public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
+  public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master, Configuration conf)
       throws IOException {
     this.conf = conf;
     this.jobId = jobId;
     this.localFs = FileSystem.getLocal(conf);
-
+    this.jobFile = jobFile;
     this.master = master;
     this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
@@ -88,7 +88,6 @@ class JobInProgress {
 
     Path jobDir = master.getSystemDirectoryForJob(jobId);
     FileSystem fs = jobDir.getFileSystem(conf);
-    jobFile = new Path(jobDir, "job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
     this.numBSPTasks = job.getNumBspTask();
@@ -155,6 +154,8 @@ class JobInProgress {
       return;
     }
 
+    LOG.debug("numBSPTasks: " + numBSPTasks);
+    
     // adjust number of map tasks to actual number of splits
     this.tasks = new TaskInProgress[numBSPTasks];
     for (int i = 0; i < numBSPTasks; i++) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java?rev=1049495&r1=1049494&r2=1049495&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java Wed Dec 15 10:51:52 2010
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
-/*
+/**
  * A JobProfile tracks job's status
  */
 public class JobProfile implements Writable {