You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/01 13:36:47 UTC

svn commit: r950021 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Author: vinodkv
Date: Tue Jun  1 11:36:47 2010
New Revision: 950021

URL: http://svn.apache.org/viewvc?rev=950021&view=rev
Log:
MAPREDUCE-1773. streaming doesn't support jobclient.output.filter. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=950021&r1=950020&r2=950021&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jun  1 11:36:47 2010
@@ -29,6 +29,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1798. Names the configuration keys for the Kerberos
     principals better. (Boris Shkolnik via ddas)
 
+    MAPREDUCE-1773. streaming doesn't support jobclient.output.filter.
+    (Amareshwari Sriramadasu via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=950021&r1=950020&r2=950021&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Jun  1 11:36:47 2010
@@ -925,40 +925,16 @@ public class StreamJob implements Tool {
 
     // if jobConf_ changes must recreate a JobClient
     jc_ = new JobClient(jobConf_);
-    boolean error = true;
     running_ = null;
-    String lastReport = null;
     try {
       running_ = jc_.submitJob(jobConf_);
       jobId_ = running_.getID();
-
-      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
-      LOG.info("Running job: " + jobId_);
       jobInfo();
-
-      while (!running_.isComplete()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-        }
-        running_ = jc_.getJob(jobId_);
-        String report = null;
-        report = " map " + Math.round(running_.mapProgress() * 100) + "%  reduce "
-          + Math.round(running_.reduceProgress() * 100) + "%";
-
-        if (!report.equals(lastReport)) {
-          LOG.info(report);
-          lastReport = report;
-        }
+      if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
+        LOG.error("Job not Successful!");
+        return 1;
       }
-      if (!running_.isSuccessful()) {
-        jobInfo();
-	LOG.error("Job not Successful!");
-	return 1;
-      }
-      LOG.info("Job complete: " + jobId_);
-      LOG.info("Output: " + output_);
-      error = false;
+      LOG.info("Output directory: " + output_);
     } catch(FileNotFoundException fe) {
       LOG.error("Error launching job , bad input path : " + fe.getMessage());
       return 2;
@@ -972,11 +948,10 @@ public class StreamJob implements Tool {
     } catch(IOException ioe) {
       LOG.error("Error Launching job : " + ioe.getMessage());
       return 5;
+    } catch (InterruptedException ie) {
+      LOG.error("Error monitoring job : " + ie.getMessage());
+      return 6;
     } finally {
-      if (error && (running_ != null)) {
-        LOG.info("killJob...");
-        running_.killJob();
-      }
       jc_.close();
     }
     return 0;