You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/15 00:14:07 UTC

svn commit: r565946 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: omalley
Date: Tue Aug 14 15:14:06 2007
New Revision: 565946

URL: http://svn.apache.org/viewvc?view=rev&rev=565946
Log:
HADOOP-1663. Fix streaming to return a non-zero exit code when it fails. 
Contributed by Lohit Renu.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=565946&r1=565945&r2=565946
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 14 15:14:06 2007
@@ -529,6 +529,8 @@
 155. HADOOP-1714.  Fix TestDFSUpgradeFromImage to work on Windows.
      (Raghu Angadi via nigel)
 
+156. HADOOP-1663.  Return a non-zero exit code if streaming fails. (Lohit Renu
+     via omalley)
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=565946&r1=565945&r2=565946
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Tue Aug 14 15:14:06 2007
@@ -28,7 +28,12 @@
 
   public static void main(String[] args) throws IOException {
     boolean mayExit = true;
+    int returnStatus = 0;
     StreamJob job = new StreamJob(args, mayExit);
-    job.go();
+    returnStatus = job.go();
+    if (returnStatus != 0) {
+      System.err.println("Streaming Job Failed!");
+      System.exit(returnStatus);
+    }
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=565946&r1=565945&r2=565946
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Aug 14 15:14:06 2007
@@ -98,7 +98,7 @@
    * to the jobtracker
    * @throws IOException
    */
-  public void go() throws IOException {
+  public int go() throws IOException {
     init();
 
     preProcessArgs();
@@ -106,7 +106,7 @@
     postProcessArgs();
 
     setJobConf();
-    submitAndMonitorJob();
+    return submitAndMonitorJob();
   }
   
   protected void init() {
@@ -868,7 +868,7 @@
   }
 
   // Based on JobClient
-  public void submitAndMonitorJob() throws IOException {
+  public int submitAndMonitorJob() throws IOException {
 
     if (jar_ != null && isLocalHadoop()) {
       // getAbs became required when shell and subvm have different working dirs...
@@ -906,28 +906,33 @@
       }
       if (!running_.isSuccessful()) {
         jobInfo();
-        throw new IOException("Job not Successful!");
+	LOG.error("Job not Successful!");
+	return 1;
       }
       LOG.info("Job complete: " + jobId_);
       LOG.info("Output: " + output_);
       error = false;
-    } catch(FileNotFoundException fe){
+    } catch(FileNotFoundException fe) {
       LOG.error("Error launching job , bad input path : " + fe.getMessage());
-    }catch(InvalidJobConfException je){
+      return 2;
+    } catch(InvalidJobConfException je) {
       LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
-    }catch(FileAlreadyExistsException fae){
+      return 3;
+    } catch(FileAlreadyExistsException fae) {
       LOG.error("Error launching job , Output path already exists : " 
                 + fae.getMessage());
-    }catch(IOException ioe){
+      return 4;
+    } catch(IOException ioe) {
       LOG.error("Error Launching job : " + ioe.getMessage());
-    }
-    finally {
+      return 5;
+    } finally {
       if (error && (running_ != null)) {
         LOG.info("killJob...");
         running_.killJob();
       }
       jc_.close();
     }
+    return 0;
   }
   /** Support -jobconf x=y x1=y1 type options **/
   class MultiPropertyOption extends PropertyOption{

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?view=auto&rev=565946
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Tue Aug 14 15:14:06 2007
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if hadoopStreaming returns Exception 
+ * on failure when submitted an invalid/failed job
+ * The test case provides an invalid input file for map/reduce job as
+ * a unit test case
+ */
+public class TestStreamingFailure extends TestStreaming
+{
+
+  protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt");
+  private StreamJob job;
+
+  public TestStreamingFailure() throws IOException
+  {
+    INVALID_INPUT_FILE = new File("invalid_input.txt");
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INVALID_INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-combiner", combine,
+      "-reducer", reduce,
+      //"-verbose",
+      //"-jobconf", "stream.debug=set"
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  public void testCommandLine()
+  {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      boolean mayExit = false;
+      int returnStatus = 0;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      returnStatus = job.go();
+      assertEquals("Streaming Job Failure code expected", 5, returnStatus);
+    } catch(Exception e) {
+      // Expecting an exception
+    } finally {
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+      new TestStreamingFailure().testCommandLine();
+  }
+}