You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2009/04/01 22:32:14 UTC

svn commit: r761049 - in /hadoop/core/branches/branch-0.20: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/

Author: omalley
Date: Wed Apr  1 20:32:13 2009
New Revision: 761049

URL: http://svn.apache.org/viewvc?rev=761049&view=rev
Log:
HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
the running job's information printed to the user's stdout as it runs.
(omalley)

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/SecondarySort.java
    hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/WordCount.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr  1 20:32:13 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Apr  1 20:32:13 2009
@@ -828,6 +828,10 @@
     updated before the JVM exits. Also makes the update to log.index atomic.
     (Ravi Gummadi via ddas)
 
+    HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
+    the running job's information printed to the user's stdout as it runs.
+    (omalley)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr  1 20:32:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046

Modified: hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/SecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/SecondarySort.java?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/SecondarySort.java (original)
+++ hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/SecondarySort.java Wed Apr  1 20:32:13 2009
@@ -233,7 +233,7 @@
     
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    System.exit(job.waitForCompletion() ? 0 : 1);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
 
 }

Modified: hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/WordCount.java?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/WordCount.java Wed Apr  1 20:32:13 2009
@@ -64,6 +64,6 @@
     job.setOutputValueClass(IntWritable.class);
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    System.exit(job.waitForCompletion() ? 0 : 1);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobClient.java Wed Apr  1 20:32:13 2009
@@ -1245,126 +1245,107 @@
    * complete.
    * 
    * @param job the job configuration.
-   * @throws IOException
+   * @throws IOException if the job fails
    */
   public static RunningJob runJob(JobConf job) throws IOException {
     JobClient jc = new JobClient(job);
-    boolean error = true;
-    RunningJob running = null;
-    String lastReport = null;
-    final int MAX_RETRIES = 5;
-    int retries = MAX_RETRIES;
-    TaskStatusFilter filter;
+    RunningJob rj = jc.submitJob(job);
     try {
-      filter = getTaskOutputFilter(job);
-    } catch(IllegalArgumentException e) {
-      LOG.warn("Invalid Output filter : " + e.getMessage() + 
-               " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
-      throw e;
+      if (!jc.monitorAndPrintJob(job, rj)) {
+        throw new IOException("Job failed!");
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
-    try {
-      running = jc.submitJob(job);
-      JobID jobId = running.getID();
-      LOG.info("Running job: " + jobId);
-      int eventCounter = 0;
-      boolean profiling = job.getProfileEnabled();
-      Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
-      Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false);
-        
-      while (true) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {}
-        try {
-          if (running.isComplete()) {
-            break;
-          }
-          running = jc.getJob(jobId);
-          if (running == null) {
-            throw new IOException("Unable to fetch job status from server.");
-          }
-          String report = 
-            (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
-             " reduce " + 
-             StringUtils.formatPercent(running.reduceProgress(), 0));
-          if (!report.equals(lastReport)) {
-            LOG.info(report);
-            lastReport = report;
-          }
-            
-          TaskCompletionEvent[] events = 
-            running.getTaskCompletionEvents(eventCounter); 
-          eventCounter += events.length;
-          for(TaskCompletionEvent event : events){
-            TaskCompletionEvent.Status status = event.getTaskStatus();
-            if (profiling && 
-                (status == TaskCompletionEvent.Status.SUCCEEDED ||
-                 status == TaskCompletionEvent.Status.FAILED) &&
+    return rj;
+  }
+  
+  /**
+   * Monitor a job and print status in real-time as progress is made and tasks 
+   * fail.
+   * @param conf the job's configuration
+   * @param job the job to track
+   * @return true if the job succeeded
+   * @throws IOException if communication to the JobTracker fails
+   */
+  public boolean monitorAndPrintJob(JobConf conf, 
+                                    RunningJob job
+  ) throws IOException, InterruptedException {
+    String lastReport = null;
+    TaskStatusFilter filter;
+    filter = getTaskOutputFilter(conf);
+    JobID jobId = job.getID();
+    LOG.info("Running job: " + jobId);
+    int eventCounter = 0;
+    boolean profiling = conf.getProfileEnabled();
+    Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
+    Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
+
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      String report = 
+        (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
+            " reduce " + 
+            StringUtils.formatPercent(job.reduceProgress(), 0));
+      if (!report.equals(lastReport)) {
+        LOG.info(report);
+        lastReport = report;
+      }
+
+      TaskCompletionEvent[] events = 
+        job.getTaskCompletionEvents(eventCounter); 
+      eventCounter += events.length;
+      for(TaskCompletionEvent event : events){
+        TaskCompletionEvent.Status status = event.getTaskStatus();
+        if (profiling && 
+            (status == TaskCompletionEvent.Status.SUCCEEDED ||
+                status == TaskCompletionEvent.Status.FAILED) &&
                 (event.isMap ? mapRanges : reduceRanges).
-                   isIncluded(event.idWithinJob())) {
-              downloadProfile(event);
-            }
-            switch(filter){
-            case NONE:
-              break;
-            case SUCCEEDED:
-              if (event.getTaskStatus() == 
-                TaskCompletionEvent.Status.SUCCEEDED){
-                LOG.info(event.toString());
-                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              }
-              break; 
-            case FAILED:
-              if (event.getTaskStatus() == 
-                TaskCompletionEvent.Status.FAILED){
-                LOG.info(event.toString());
-                // Displaying the task diagnostic information
-                TaskAttemptID taskId = event.getTaskAttemptId();
-                String[] taskDiagnostics = 
-                  jc.jobSubmitClient.getTaskDiagnostics(taskId); 
-                if (taskDiagnostics != null) {
-                  for(String diagnostics : taskDiagnostics){
-                    System.err.println(diagnostics);
-                  }
-                }
-                // Displaying the task logs
-                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              }
-              break; 
-            case KILLED:
-              if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
-                LOG.info(event.toString());
+                isIncluded(event.idWithinJob())) {
+          downloadProfile(event);
+        }
+        switch(filter){
+        case NONE:
+          break;
+        case SUCCEEDED:
+          if (event.getTaskStatus() == 
+            TaskCompletionEvent.Status.SUCCEEDED){
+            LOG.info(event.toString());
+            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+          }
+          break; 
+        case FAILED:
+          if (event.getTaskStatus() == 
+            TaskCompletionEvent.Status.FAILED){
+            LOG.info(event.toString());
+            // Displaying the task diagnostic information
+            TaskAttemptID taskId = event.getTaskAttemptId();
+            String[] taskDiagnostics = 
+              jobSubmitClient.getTaskDiagnostics(taskId); 
+            if (taskDiagnostics != null) {
+              for(String diagnostics : taskDiagnostics){
+                System.err.println(diagnostics);
               }
-              break; 
-            case ALL:
-              LOG.info(event.toString());
-              displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              break;
             }
+            // Displaying the task logs
+            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
           }
-          retries = MAX_RETRIES;
-        } catch (IOException ie) {
-          if (--retries == 0) {
-            LOG.warn("Final attempt failed, killing job.");
-            throw ie;
+          break; 
+        case KILLED:
+          if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
+            LOG.info(event.toString());
           }
-          LOG.info("Communication problem with server: " +
-                   StringUtils.stringifyException(ie));
+          break; 
+        case ALL:
+          LOG.info(event.toString());
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+          break;
         }
       }
-      if (!running.isSuccessful()) {
-        throw new IOException("Job failed!");
-      }
-      LOG.info("Job complete: " + jobId);
-      running.getCounters().log(LOG);
-      error = false;
-    } finally {
-      if (error && (running != null)) {
-        running.killJob();
-      }
-      jc.close();
     }
-    return running;
+    LOG.info("Job complete: " + jobId);
+    job.getCounters().log(LOG);
+    return job.isSuccessful();
   }
 
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java Wed Apr  1 20:32:13 2009
@@ -38,7 +38,7 @@
 public class Job extends JobContext {  
   public static enum JobState {DEFINE, RUNNING};
   private JobState state = JobState.DEFINE;
-  private JobClient jobTracker;
+  private JobClient jobClient;
   private RunningJob info;
 
   public Job() throws IOException {
@@ -47,7 +47,7 @@
 
   public Job(Configuration conf) throws IOException {
     super(conf, null);
-    jobTracker = new JobClient((JobConf) getConfiguration());
+    jobClient = new JobClient((JobConf) getConfiguration());
   }
 
   public Job(Configuration conf, String jobName) throws IOException {
@@ -429,22 +429,29 @@
                               ClassNotFoundException {
     ensureState(JobState.DEFINE);
     setUseNewAPI();
-    info = jobTracker.submitJobInternal(conf);
+    info = jobClient.submitJobInternal(conf);
     state = JobState.RUNNING;
    }
   
   /**
    * Submit the job to the cluster and wait for it to finish.
+   * @param verbose print the progress to the user
    * @return true if the job succeeded
    * @throws IOException thrown if the communication with the 
    *         <code>JobTracker</code> is lost
    */
-  public boolean waitForCompletion() throws IOException, InterruptedException,
+  public boolean waitForCompletion(boolean verbose
+                                   ) throws IOException, InterruptedException,
                                             ClassNotFoundException {
     if (state == JobState.DEFINE) {
       submit();
     }
-    info.waitForCompletion();
+    if (verbose) {
+      jobClient.monitorAndPrintJob(conf, info);
+    } else {
+      info.waitForCompletion();
+    }
     return isSuccessful();
   }
+  
 }

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=761049&r1=761048&r2=761049&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Wed Apr  1 20:32:13 2009
@@ -111,7 +111,7 @@
     job.setOutputValueClass(IntWritable.class);
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-    assertTrue(job.waitForCompletion());
+    assertTrue(job.waitForCompletion(false));
     String out = readFile("out/part-r-00000");
     System.out.println(out);
     assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
@@ -156,7 +156,7 @@
     
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-    assertTrue(job.waitForCompletion());
+    assertTrue(job.waitForCompletion(true));
     String out = readFile("out/part-r-00000");
     assertEquals("------------------------------------------------\n" +
                  "-3\t23\n" +