You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/28 12:55:31 UTC

svn commit: r769330 - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/

Author: sharad
Date: Tue Apr 28 10:55:30 2009
New Revision: 769330

URL: http://svn.apache.org/viewvc?rev=769330&view=rev
Log:
 HADOOP-5680. Change SleepJob to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 28 10:55:30 2009
@@ -256,6 +256,9 @@
     HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
     TupleWritable encoding. (Jingkei Ly via cdouglas)
 
+    HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new 
+    mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Tue Apr 28 10:55:30 2009
@@ -20,18 +20,18 @@
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.util.Iterator;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -42,64 +42,69 @@
  * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
  * some disk space.
  */
-public class SleepJob extends Configured implements Tool,  
-             Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
-             Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
-             Partitioner<IntWritable,NullWritable> {
-
-  private long mapSleepDuration = 100;
-  private long reduceSleepDuration = 100;
-  private int mapSleepCount = 1;
-  private int reduceSleepCount = 1;
-  private int count = 0;
+public class SleepJob extends Configured implements Tool {
 
-  public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
-    return k.get() % numPartitions;
+  public static class SleepJobPartitioner extends 
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
   }
   
-  public static class EmptySplit implements InputSplit {
+  public static class EmptySplit extends InputSplit implements Writable {
     public void write(DataOutput out) throws IOException { }
     public void readFields(DataInput in) throws IOException { }
     public long getLength() { return 0L; }
     public String[] getLocations() { return new String[0]; }
   }
 
-  public static class SleepInputFormat extends Configured
-      implements InputFormat<IntWritable,IntWritable> {
-    public InputSplit[] getSplits(JobConf conf, int numSplits) {
-      InputSplit[] ret = new InputSplit[numSplits];
+  public static class SleepInputFormat 
+      extends InputFormat<IntWritable,IntWritable> {
+    
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt("mapred.map.tasks", 1);
       for (int i = 0; i < numSplits; ++i) {
-        ret[i] = new EmptySplit();
+        ret.add(new EmptySplit());
       }
       return ret;
     }
-    public RecordReader<IntWritable,IntWritable> getRecordReader(
-        InputSplit ignored, JobConf conf, Reporter reporter)
+    
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
         throws IOException {
+      Configuration conf = taskContext.getConfiguration();
       final int count = conf.getInt("sleep.job.map.sleep.count", 1);
       if (count < 0) throw new IOException("Invalid map count: " + count);
       final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
       if (redcount < 0)
         throw new IOException("Invalid reduce count: " + redcount);
-      final int emitPerMapTask = (redcount * conf.getNumReduceTasks());
-    return new RecordReader<IntWritable,IntWritable>() {
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      
+      return new RecordReader<IntWritable,IntWritable>() {
         private int records = 0;
         private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
 
-        public boolean next(IntWritable key, IntWritable value)
+        public boolean nextKeyValue()
             throws IOException {
+          key = new IntWritable();
           key.set(emitCount);
           int emit = emitPerMapTask / count;
           if ((emitPerMapTask) % count > records) {
             ++emit;
           }
           emitCount += emit;
+          value = new IntWritable();
           value.set(emit);
           return records++ < count;
         }
-        public IntWritable createKey() { return new IntWritable(); }
-        public IntWritable createValue() { return new IntWritable(); }
-        public long getPos() throws IOException { return records; }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
         public void close() throws IOException { }
         public float getProgress() throws IOException {
           return records / ((float)count);
@@ -108,93 +113,104 @@
     }
   }
 
-  public void map(IntWritable key, IntWritable value,
-      OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
-      throws IOException {
-
-    //it is expected that every map processes mapSleepCount number of records. 
-    try {
-      reporter.setStatus("Sleeping... (" +
+  public static class SleepMapper 
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt("sleep.job.map.sleep.count", mapSleepCount);
+      this.mapSleepDuration =
+        conf.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records. 
+      try {
+        context.setStatus("Sleeping... (" +
           (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
-      Thread.sleep(mapSleepDuration);
-    }
-    catch (InterruptedException ex) {
-      throw (IOException)new IOException(
-          "Interrupted while sleeping").initCause(ex);
-    }
-    ++count;
-    // output reduceSleepCount * numReduce number of random values, so that
-    // each reducer will get reduceSleepCount number of keys.
-    int k = key.get();
-    for (int i = 0; i < value.get(); ++i) {
-      output.collect(new IntWritable(k + i), NullWritable.get());
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
     }
   }
+  
+  public static class SleepReducer  
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+      this.reduceSleepDuration =
+        conf.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
+    }
 
-  public void reduce(IntWritable key, Iterator<NullWritable> values,
-      OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+                       Context context)
       throws IOException {
-    try {
-      reporter.setStatus("Sleeping... (" +
-          (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
         Thread.sleep(reduceSleepDuration);
       
-    }
-    catch (InterruptedException ex) {
-      throw (IOException)new IOException(
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
           "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
     }
-    count++;
   }
 
-  public void configure(JobConf job) {
-    this.mapSleepCount =
-      job.getInt("sleep.job.map.sleep.count", mapSleepCount);
-    this.reduceSleepCount =
-      job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
-    this.mapSleepDuration =
-      job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
-    this.reduceSleepDuration =
-      job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
-  }
-
-  public void close() throws IOException {
-  }
-
-  public static void main(String[] args) throws Exception{
+  public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
     System.exit(res);
   }
 
-  public int run(int numMapper, int numReducer, long mapSleepTime,
-      int mapSleepCount, long reduceSleepTime,
-      int reduceSleepCount) throws IOException {
-    JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime, 
-                  mapSleepCount, reduceSleepTime, reduceSleepCount);
-    JobClient.runJob(job);
-    return 0;
-  }
-
-  public JobConf setupJobConf(int numMapper, int numReducer, 
-                                long mapSleepTime, int mapSleepCount, 
-                                long reduceSleepTime, int reduceSleepCount) {
-    JobConf job = new JobConf(getConf(), SleepJob.class);
-    job.setNumMapTasks(numMapper);
+  public Job createJob(int numMapper, int numReducer, 
+                       long mapSleepTime, int mapSleepCount, 
+                       long reduceSleepTime, int reduceSleepCount) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong("sleep.job.map.sleep.time", mapSleepTime);
+    conf.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+    conf.setInt("sleep.job.map.sleep.count", mapSleepCount);
+    conf.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+    conf.setInt("mapred.map.tasks", numMapper);
+    Job job = new Job(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(SleepJob.class);
     job.setNumReduceTasks(numReducer);
-    job.setMapperClass(SleepJob.class);
+    job.setMapperClass(SleepMapper.class);
     job.setMapOutputKeyClass(IntWritable.class);
     job.setMapOutputValueClass(NullWritable.class);
-    job.setReducerClass(SleepJob.class);
-    job.setOutputFormat(NullOutputFormat.class);
-    job.setInputFormat(SleepInputFormat.class);
-    job.setPartitionerClass(SleepJob.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(SleepJobPartitioner.class);
     job.setSpeculativeExecution(false);
     job.setJobName("Sleep job");
     FileInputFormat.addInputPath(job, new Path("ignored"));
-    job.setLong("sleep.job.map.sleep.time", mapSleepTime);
-    job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
-    job.setInt("sleep.job.map.sleep.count", mapSleepCount);
-    job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
     return job;
   }
 
@@ -205,7 +221,7 @@
           " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
           " [-recordt recordSleepTime (msec)]");
       ToolRunner.printGenericCommandUsage(System.err);
-      return -1;
+      return 2;
     }
 
     int numMapper = 1, numReducer = 1;
@@ -233,9 +249,9 @@
     // sleep for *SleepTime duration in Task by recSleepTime per record
     mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
     reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
-    
-    return run(numMapper, numReducer, mapSleepTime, mapSleepCount,
-        reduceSleepTime, reduceSleepCount);
+    Job job = createJob(numMapper, numReducer, mapSleepTime,
+                mapSleepCount, reduceSleepTime, reduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Tue Apr 28 10:55:30 2009
@@ -255,6 +255,41 @@
   }
 
   /**
+   * Turn speculative execution on or off for this job. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on, else <code>false</code>.
+   */
+  public void setSpeculativeExecution(boolean speculativeExecution) {
+    ensureState(JobState.DEFINE);
+    conf.setSpeculativeExecution(speculativeExecution);
+  }
+
+  /**
+   * Turn speculative execution on or off for this job for map tasks. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on for map tasks,
+   *                             else <code>false</code>.
+   */
+  public void setMapSpeculativeExecution(boolean speculativeExecution) {
+    ensureState(JobState.DEFINE);
+    conf.setMapSpeculativeExecution(speculativeExecution);
+  }
+
+  /**
+   * Turn speculative execution on or off for this job for reduce tasks. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on for reduce tasks,
+   *                             else <code>false</code>.
+   */
+  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
+    ensureState(JobState.DEFINE);
+    conf.setReduceSpeculativeExecution(speculativeExecution);
+  }
+
+  /**
    * Get the URL where some job progress information will be displayed.
    * 
    * @return the URL where some job progress information will be displayed.
@@ -265,6 +300,16 @@
   }
 
   /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public JobID getID() {
+    ensureState(JobState.RUNNING);
+    return info.getID();
+  }
+  
+  /**
    * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
    * and 1.0.  When all map tasks have completed, the function returns 1.0.
    * 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Tue Apr 28 10:55:30 2009
@@ -34,6 +34,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -78,23 +79,27 @@
     assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
   }
   
-  public void testAllEnabledACLForJobSubmission() throws IOException {
+  public void testAllEnabledACLForJobSubmission() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
     verifyJobSubmission(conf, true);
   }
   
-  public void testAllDisabledACLForJobSubmission() throws IOException {
+  public void testAllDisabledACLForJobSubmission() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
     verifyJobSubmission(conf, false);
   }
   
-  public void testUserDisabledACLForJobSubmission() throws IOException {
+  public void testUserDisabledACLForJobSubmission() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", 
                                 "3698-non-existent-user");
     verifyJobSubmission(conf, false);
   }
   
-  public void testDisabledACLForNonDefaultQueue() throws IOException {
+  public void testDisabledACLForNonDefaultQueue() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     // allow everyone in default queue
     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
     // setup a different queue
@@ -105,13 +110,14 @@
     verifyJobSubmission(conf, false, "q1");
   }
   
-  public void testSubmissionToInvalidQueue() throws IOException{
+  public void testSubmissionToInvalidQueue() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = new JobConf();
     conf.set("mapred.queue.names","default");
     setUpCluster(conf);
     String queueName = "q1";
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
+      Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
     } catch (IOException ioe) {      
        assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
        return;
@@ -122,7 +128,7 @@
   }
   
   public void testEnabledACLForNonDefaultQueue() throws IOException,
-                                                          LoginException {
+      LoginException, InterruptedException, ClassNotFoundException {
     // login as self...
     UserGroupInformation ugi = UnixUserGroupInformation.login();
     String userName = ugi.getUserName();
@@ -137,7 +143,8 @@
   }
   
   public void testUserEnabledACLForJobSubmission() 
-                                    throws IOException, LoginException {
+      throws IOException, LoginException, 
+             InterruptedException, ClassNotFoundException {
     // login as self...
     UserGroupInformation ugi = UnixUserGroupInformation.login();
     String userName = ugi.getUserName();
@@ -148,7 +155,8 @@
   }
   
   public void testGroupsEnabledACLForJobSubmission() 
-                                    throws IOException, LoginException {
+      throws IOException, LoginException, 
+             InterruptedException, ClassNotFoundException {
     // login as self, get one group, and add in allowed list.
     UserGroupInformation ugi = UnixUserGroupInformation.login();
     String[] groups = ugi.getGroupNames();
@@ -160,23 +168,27 @@
     verifyJobSubmission(conf, true);
   }
   
-  public void testAllEnabledACLForJobKill() throws IOException {
+  public void testAllEnabledACLForJobKill() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
     verifyJobKill(conf, true);
   }
 
-  public void testAllDisabledACLForJobKill() throws IOException {
+  public void testAllDisabledACLForJobKill() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
     verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
   }
   
-  public void testOwnerAllowedForJobKill() throws IOException {
+  public void testOwnerAllowedForJobKill() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
                                               "junk-user");
     verifyJobKill(conf, true);
   }
   
-  public void testUserDisabledACLForJobKill() throws IOException {
+  public void testUserDisabledACLForJobKill() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     //setup a cluster allowing a user to submit
     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
                                               "dummy-user");
@@ -184,7 +196,7 @@
   }
   
   public void testUserEnabledACLForJobKill() throws IOException, 
-                                                    LoginException {
+      LoginException, InterruptedException, ClassNotFoundException {
     // login as self...
     UserGroupInformation ugi = UnixUserGroupInformation.login();
     String userName = ugi.getUserName();
@@ -193,7 +205,8 @@
     verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
   }
   
-  public void testUserDisabledForJobPriorityChange() throws IOException {
+  public void testUserDisabledForJobPriorityChange() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
                               "junk-user");
     verifyJobPriorityChangeAsOtherUser(conf, false, 
@@ -379,12 +392,13 @@
   }
   
   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
-                                              throws IOException {
+      throws IOException, InterruptedException, ClassNotFoundException {
     verifyJobSubmission(conf, shouldSucceed, "default");
   }
 
   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
-      String queue) throws IOException {
+                                   String queue) 
+      throws IOException, InterruptedException, ClassNotFoundException {
     setUpCluster(conf);
     try {
       runAndVerifySubmission(conf, shouldSucceed, queue, null);
@@ -395,9 +409,9 @@
 
   private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
       String queue, String userInfo)
-      throws IOException {
+      throws IOException, InterruptedException, ClassNotFoundException {
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
+      Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
       if (shouldSucceed) {
         assertTrue(rjob.isSuccessful());
       } else {
@@ -428,10 +442,10 @@
 }
 
   private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
-                                      throws IOException {
+      throws IOException, InterruptedException, ClassNotFoundException {
     setUpCluster(conf);
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
+      Job rjob = submitSleepJob(1, 1, 1000, 1000, false);
       assertFalse(rjob.isComplete());
       while(rjob.mapProgress() == 0.0f) {
         try {
@@ -441,7 +455,7 @@
         }
       }
       rjob.killJob();
-      while(rjob.cleanupProgress() == 0.0f) {
+      while (!rjob.isComplete()) {
         try {
           Thread.sleep(10);  
         } catch (InterruptedException ie) {
@@ -449,7 +463,7 @@
         }
       }
       if (shouldSucceed) {
-        assertTrue(rjob.isComplete());
+        assertTrue(!rjob.isSuccessful());
       } else {
         fail("Job kill should have failed.");
       }
@@ -470,16 +484,18 @@
   
   private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
                                         String otherUserInfo) 
-                        throws IOException {
+  throws IOException, InterruptedException, ClassNotFoundException {
     setUpCluster(conf);
     try {
       // submit a job as another user.
       String userInfo = otherUserInfo;
-      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
-      assertFalse(rjob.isComplete());
+      Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+      assertFalse(job.isComplete());
 
       //try to kill as self
       try {
+        JobClient jc = new JobClient(miniMRCluster.createJobConf());
+        RunningJob rjob = jc.getJob((JobID)job.getID());
         rjob.killJob();
         if (!shouldSucceed) {
           fail("should fail kill operation");  
@@ -495,7 +511,7 @@
                                     "ADMINISTER_JOBS on queue default"));
       }
       //wait for job to complete on its own
-      while (!rjob.isComplete()) {
+      while (!job.isComplete()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException ie) {
@@ -509,16 +525,18 @@
   
   private void verifyJobPriorityChangeAsOtherUser(JobConf conf, 
                           boolean shouldSucceed, String otherUserInfo)
-                            throws IOException {
+  throws IOException, InterruptedException, ClassNotFoundException {
     setUpCluster(conf);
     try {
       // submit job as another user.
       String userInfo = otherUserInfo;
-      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
-      assertFalse(rjob.isComplete());
+      Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+      assertFalse(job.isComplete());
       
       // try to change priority as self
       try {
+        JobClient jc = new JobClient(miniMRCluster.createJobConf());
+        RunningJob rjob = jc.getJob((JobID)job.getID());
         rjob.setJobPriority("VERY_LOW");
         if (!shouldSucceed) {
           fail("changing priority should fail.");
@@ -531,7 +549,7 @@
                                     "ADMINISTER_JOBS on queue default"));
       }
       //wait for job to complete on its own
-      while (!rjob.isComplete()) {
+      while (!job.isComplete()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException ie) {
@@ -556,49 +574,47 @@
     if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
   }
   
-  private RunningJob submitSleepJob(int numMappers, int numReducers, 
-                            long mapSleepTime, long reduceSleepTime,
-                            boolean shouldComplete) 
-                              throws IOException {
+  private Job submitSleepJob(int numMappers, int numReducers, 
+                             long mapSleepTime, long reduceSleepTime,
+                             boolean shouldComplete) 
+      throws IOException, InterruptedException, ClassNotFoundException {
     return submitSleepJob(numMappers, numReducers, mapSleepTime,
                           reduceSleepTime, shouldComplete, null);
   }
   
-  private RunningJob submitSleepJob(int numMappers, int numReducers, 
-                                      long mapSleepTime, long reduceSleepTime,
-                                      boolean shouldComplete, String userInfo) 
-                                            throws IOException {
+  private Job submitSleepJob(int numMappers, int numReducers, 
+                             long mapSleepTime, long reduceSleepTime,
+                             boolean shouldComplete, String userInfo) 
+  throws IOException, InterruptedException, ClassNotFoundException {
     return submitSleepJob(numMappers, numReducers, mapSleepTime, 
                           reduceSleepTime, shouldComplete, userInfo, null);
   }
 
-  private RunningJob submitSleepJob(int numMappers, int numReducers, 
-                                    long mapSleepTime, long reduceSleepTime,
-                                    boolean shouldComplete, String userInfo,
-                                    String queueName) 
-                                      throws IOException {
-    JobConf clientConf = new JobConf();
+  private Job submitSleepJob(int numMappers, int numReducers, 
+                             long mapSleepTime, long reduceSleepTime,
+                             boolean shouldComplete, String userInfo,
+                             String queueName) 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration clientConf = new Configuration();
     clientConf.set("mapred.job.tracker", "localhost:"
         + miniMRCluster.getJobTrackerPort());
-    SleepJob job = new SleepJob();
-    job.setConf(clientConf);
-    clientConf = job.setupJobConf(numMappers, numReducers, 
-        mapSleepTime, (int)mapSleepTime/100,
-        reduceSleepTime, (int)reduceSleepTime/100);
-    if (queueName != null) {
-      clientConf.setQueueName(queueName);
-    }
-    JobConf jc = new JobConf(clientConf);
     if (userInfo != null) {
-      jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+      clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+    }
+    if (queueName != null) {
+      clientConf.set("mapred.job.queue.name", queueName);
     }
-    RunningJob rJob = null;
+    SleepJob sleep = new SleepJob();
+    sleep.setConf(clientConf);
+    Job job = sleep.createJob(numMappers, numReducers, 
+        mapSleepTime, (int)mapSleepTime/100,
+        reduceSleepTime, (int)reduceSleepTime/100);
     if (shouldComplete) {
-      rJob = JobClient.runJob(jc);  
+      job.waitForCompletion(false);  
     } else {
-      rJob = new JobClient(clientConf).submitJob(jc);
+      job.submit();
     }
-    return rJob;
+    return job;
   }
 
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Apr 28 10:55:30 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
@@ -66,9 +67,9 @@
     }
   }
 
-  private void runSleepJob(JobConf conf) throws Exception {
+  private int runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
-    ToolRunner.run(conf, new SleepJob(), args);
+    return ToolRunner.run(conf, new SleepJob(), args);
   }
 
   private void runAndCheckSuccessfulJob(JobConf conf)
@@ -87,16 +88,15 @@
     Matcher mat = null;
 
     // Start the job.
-    boolean success = true;
+    int ret;
     try {
-      runSleepJob(conf);
-      success = true;
+      ret = runSleepJob(conf);
     } catch (Exception e) {
-      success = false;
+      ret = 1;
     }
 
     // Job has to succeed
-    assertTrue(success);
+    assertTrue(ret == 0);
 
     JobClient jClient = new JobClient(conf);
     JobStatus[] jStatus = jClient.getAllJobs();
@@ -279,16 +279,15 @@
         + nn.getNameNodeAddress().getPort());
 
     // Start the job.
-    boolean success = true;
+    int ret = 0;
     try {
-      runSleepJob(conf);
-      success = true;
+      ret = runSleepJob(conf);
     } catch (Exception e) {
-      success = false;
+      ret = 1;
     }
 
     // Job has to fail
-    assertFalse(success);
+    assertTrue(ret != 0);
 
     JobClient jClient = new JobClient(conf);
     JobStatus[] jStatus = jClient.getAllJobs();
@@ -382,12 +381,13 @@
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(conf);
     // Start the job
-    RunningJob job =
-        jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
+    Job job = sleepJob.createJob(1, 1, 5000, 1, 1000, 1);
+    job.submit();
     boolean TTOverFlowMsgPresent = false;
     while (true) {
       // Set-up tasks are the first to be launched.
-      TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
+      TaskReport[] setUpReports = jClient.getSetupTaskReports(
+                                    (org.apache.hadoop.mapred.JobID)job.getID());
       for (TaskReport tr : setUpReports) {
         String[] diag = tr.getDiagnostics();
         for (String str : diag) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Tue Apr 28 10:55:30 2009
@@ -20,12 +20,11 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob.SleepInputFormat;
+import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 
 import junit.framework.TestCase;
@@ -37,16 +36,16 @@
   final Path inDir = new Path("/testing");
   final Path outDir = new Path("/output");
 
-  public static class SleepJobFailOnHost extends MapReduceBase
-    implements Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+  public static class FailOnHostMapper extends MapReduceBase
+    implements Mapper<Text, Text, Text, Text> {
     String hostname = "";
     
     public void configure(JobConf job) {
       this.hostname = job.get("slave.host.name");
     }
     
-    public void map(IntWritable key, IntWritable value,
-                    OutputCollector<IntWritable, NullWritable> output,
+    public void map(Text key, Text value,
+                    OutputCollector<Text, Text> output,
                     Reporter reporter)
     throws IOException {
       if (this.hostname.equals(hosts[0])) {
@@ -81,11 +80,11 @@
     job.setInt("mapred.max.tracker.failures", 1);
     job.setNumMapTasks(30);
     job.setNumReduceTasks(0);
-    job.setMapperClass(SleepJobFailOnHost.class);
-    job.setMapOutputKeyClass(IntWritable.class);
-    job.setMapOutputValueClass(NullWritable.class);
+    job.setMapperClass(FailOnHostMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
     job.setOutputFormat(NullOutputFormat.class);
-    job.setInputFormat(SleepInputFormat.class);
+    job.setInputFormat(RandomInputFormat.class);
     FileInputFormat.setInputPaths(job, inDir);
     FileOutputFormat.setOutputPath(job, outDir);