You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/15 19:14:35 UTC

svn commit: r775239 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backe...

Author: olga
Date: Fri May 15 17:14:35 2009
New Revision: 775239

URL: http://svn.apache.org/viewvc?rev=775239&view=rev
Log:
PIG-781: Error reporting for failed MR jobs (hagleitn via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 15 17:14:35 2009
@@ -76,6 +76,8 @@
 
 PIG-799: Unit tests on windows are failing after multiquery commit (daijy)
 
+PIG-781: Error reporting for failed MR jobs (hagleitn via olgan)
+
 Release 0.2.0
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri May 15 17:14:35 2009
@@ -99,12 +99,13 @@
         opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
-        opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+        opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
 
         ExecMode mode = ExecMode.UNKNOWN;
         String file = null;
@@ -122,8 +123,12 @@
         //by default warning aggregation is on
         properties.setProperty("aggregate.warning", ""+true);
 
+        //by default multiquery optimization is on
         properties.setProperty("opt.multiquery", ""+true);
 
+        //by default we keep going on error on the backend
+        properties.setProperty("stop.on.failure", ""+false);
+
         char opt;
         while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
             switch (opt) {
@@ -164,6 +169,10 @@
                 file = opts.getValStr();
                 break;
 
+            case 'F':
+                properties.setProperty("stop.on.failure", ""+true);
+                break;
+
             case 'h':
                 usage();
                 return;
@@ -359,8 +368,9 @@
             logFileName = validateLogFile(logFileName, remainders[0]);
             pigContext.getProperties().setProperty("pig.logfile", logFileName);
 
-            if (!debug)
+            if (!debug) {
                 new File(substFile).deleteOnExit();
+            }
 
             // Set job name based on name of the script
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, 
@@ -369,8 +379,21 @@
 
             grunt = new Grunt(pin, pigContext);
             gruntCalled = true;
-            grunt.exec();
-            rc = 0;
+            int[] results = grunt.exec();
+            if (results[1] == 0) {
+                // no failed jobs
+                rc = 0;
+            }
+            else {
+                if (results[0] == 0) {
+                    // no succeeded jobs
+                    rc = 2;
+                }
+                else {
+                    // some jobs have failed
+                    rc = 3;
+                }
+            }
             return;
         }
 
@@ -387,6 +410,7 @@
         } else {
             rc = 2;
         }
+
         if(!gruntCalled) {
         	LogUtils.writeLog(pe, logFileName, log, verbose);
         }
@@ -495,30 +519,31 @@
 public static void usage()
 {
 	System.out.println("\n"+getVersionString()+"\n");
-    System.out.println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
-    System.out.println("       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
-    System.out.println("       Pig [options] [-f[ile]] file : Run cmds found in file.");
-    System.out.println("  options include:");
-    System.out.println("    -4, -log4jconf log4j configuration file, overrides log conf");
-    System.out.println("    -b, -brief brief logging (no timestamps)");
-    System.out.println("    -c, -cluster clustername, kryptonite is default");
-    System.out.println("    -d, -debug debug level, INFO is default");
-    System.out.println("    -e, -execute commands to execute (within quotes)");
-    System.out.println("    -f, -file path to the script to execute");
-    System.out.println("    -h, -help display this message");
-    System.out.println("    -i, -version display version information");
-    System.out.println("    -j, -jar jarfile load jarfile"); 
-    System.out.println("    -l, -logfile path to client side log file; current working directory is default");
-    System.out.println("    -m, -param_file path to the parameter file");
-    System.out.println("    -o, -hod read hod server from system property ssh.gateway");
-    System.out.println("    -p, -param key value pair of the form param=val");
-    System.out.println("    -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
-    System.out.println("    -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
-    System.out.println("    -v, -verbose print all error messages to screen");
-    System.out.println("    -w, -warning turn warning on; also turns warning aggregation off");
-    System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
+        System.out.println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
+        System.out.println("       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
+        System.out.println("       Pig [options] [-f[ile]] file : Run cmds found in file.");
+        System.out.println("  options include:");
+        System.out.println("    -4, -log4jconf log4j configuration file, overrides log conf");
+        System.out.println("    -b, -brief brief logging (no timestamps)");
+        System.out.println("    -c, -cluster clustername, kryptonite is default");
+        System.out.println("    -d, -debug debug level, INFO is default");
+        System.out.println("    -e, -execute commands to execute (within quotes)");
+        System.out.println("    -f, -file path to the script to execute");
+        System.out.println("    -h, -help display this message");
+        System.out.println("    -i, -version display version information");
+        System.out.println("    -j, -jar jarfile load jarfile"); 
+        System.out.println("    -l, -logfile path to client side log file; current working directory is default");
+        System.out.println("    -m, -param_file path to the parameter file");
+        System.out.println("    -o, -hod read hod server from system property ssh.gateway");
+        System.out.println("    -p, -param key value pair of the form param=val");
+        System.out.println("    -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
+        System.out.println("    -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
+        System.out.println("    -v, -verbose print all error messages to screen");
+        System.out.println("    -w, -warning turn warning on; also turns warning aggregation off");
+        System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
 
-    System.out.println("    -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
+        System.out.println("    -F, -stop_on_failure aborts execution on the first failed job; off by default");
+        System.out.println("    -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
 }
 
 private static String validateLogFile(String logFileName, String scriptName) {

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri May 15 17:14:35 2009
@@ -24,6 +24,7 @@
 import java.io.PrintStream;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Enumeration;
@@ -227,10 +228,10 @@
      * @throws FrontendException
      * @throws ExecException
      */
-    public void executeBatch() throws FrontendException, ExecException {
+    public List<ExecJob> executeBatch() throws FrontendException, ExecException {
         if (!isMultiQuery) {
             // ignore if multiquery is off
-            return;
+            return new LinkedList<ExecJob>();
         }
 
         if (currDAG == null || !isBatchOn()) {
@@ -239,7 +240,7 @@
             throw new FrontendException(msg, errCode, PigException.INPUT);
         }
         
-        currDAG.execute();
+        return currDAG.execute();
     }
 
     /**
@@ -453,7 +454,11 @@
             // invocation of "execute" is synchronous!
 
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
-                    return job.getResults();
+                return job.getResults();
+            } else if (job.getStatus() == JOB_STATUS.FAILED
+                       && job.getException() != null) {
+                // throw the backend exception in the failed case
+                throw job.getException();
             } else {
                 throw new IOException("Job terminated with anomalous status "
                     + job.getStatus().toString());
@@ -484,8 +489,9 @@
             String filename,
             String func) throws IOException {
 
-        if (!currDAG.getAliasOp().containsKey(id))
+        if (!currDAG.getAliasOp().containsKey(id)) {
             throw new IOException("Invalid alias: " + id);
+        }
 
         try {
             LogicalPlan lp = compileLp(id);
@@ -507,7 +513,11 @@
             }
             
             LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf);
-            return executeCompiledLogicalPlan(storePlan);
+            List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
+            if (jobs.size() < 1) {
+                throw new IOException("Couldn't retrieve job.");
+            }
+            return jobs.get(0);
         } catch (Exception e) {
             int errCode = 1002;
             String msg = "Unable to store alias " + id;
@@ -734,30 +744,34 @@
         return lp;
     }
     
-    private ExecJob execute(String alias) throws FrontendException, ExecException {
+    private List<ExecJob> execute(String alias) throws FrontendException, ExecException {
         LogicalPlan typeCheckedLp = compileLp(alias);
 
         if (typeCheckedLp.size() == 0) {
-            return null;
+            return new LinkedList<ExecJob>();
         }
 
         LogicalOperator op = typeCheckedLp.getLeaves().get(0);
         if (op instanceof LODefine) {
             log.info("Skip execution of DEFINE only logical plan.");
-            return null;
+            return new LinkedList<ExecJob>();
         }
 
         return executeCompiledLogicalPlan(typeCheckedLp);
     }
     
-    private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
+    private List<ExecJob> executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
         PhysicalPlan pp = compilePp(compiledLp);
         // execute using appropriate engine
         FileLocalizer.clearDeleteOnFail();
-        ExecJob execJob = pigContext.getExecutionEngine().execute(pp, "execute");
-        if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED)
-            FileLocalizer.triggerDeleteOnFail();
-        return execJob;
+        List<ExecJob> execJobs = pigContext.getExecutionEngine().execute(pp, "execute");
+        for (ExecJob execJob: execJobs) {
+            if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED) {
+                FileLocalizer.triggerDeleteOnFail();
+                break;
+            }
+        }
+        return execJobs;
     }
 
     private LogicalPlan compileLp(
@@ -912,10 +926,11 @@
 
         boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
         
-        void execute() throws ExecException, FrontendException {
+        List<ExecJob> execute() throws ExecException, FrontendException {
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
-            PigServer.this.execute(null);
+            List<ExecJob> jobs = PigServer.this.execute(null);
             processedStores = storeOpTable.keySet().size();
+            return jobs;
         }
 
         void markAsExecuted() {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Fri May 15 17:14:35 2009
@@ -101,4 +101,9 @@
     public void getSTDOut(OutputStream out) throws ExecException;
     
     public void getSTDError(OutputStream error) throws ExecException;
+
+    /**
+     * Get exceptions that happened during execution
+     */
+    public Exception getException();
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri May 15 17:14:35 2009
@@ -19,6 +19,7 @@
 package org.apache.pig.backend.executionengine;
 
 import java.io.PrintStream;
+import java.util.List;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.Map;
@@ -105,8 +106,8 @@
      * @param jobName Name of this plan, will be used to identify the plan
      * @throws ExecException
      */
-    public ExecJob execute(PhysicalPlan plan,
-                           String jobName) throws ExecException;
+    public List<ExecJob> execute(PhysicalPlan plan,
+                                 String jobName) throws ExecException;
 
     /**
      * Execute the physical plan in non-blocking mode
@@ -115,8 +116,8 @@
      * @param jobName Name of this plan, will be used to identify the plan
      * @throws ExecException
      */
-    public ExecJob submit(PhysicalPlan plan,
-                          String jobName) throws ExecException;
+    public List<ExecJob> submit(PhysicalPlan plan,
+                                      String jobName) throws ExecException;
 
     /**
      * Explain executor specific information.

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri May 15 17:14:35 2009
@@ -34,6 +34,9 @@
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -72,9 +75,9 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 
-
 public class HExecutionEngine implements ExecutionEngine {
     
     private static final String HOD_SERVER = "hod.server";
@@ -253,18 +256,25 @@
         }
     }
 
-    public ExecJob execute(PhysicalPlan plan,
-                           String jobName) throws ExecException {
-        try {
-            FileSpec spec = ExecTools.checkLeafIsStore(plan, pigContext);
+    public List<ExecJob> execute(PhysicalPlan plan,
+                                 String jobName) throws ExecException {
+        MapReduceLauncher launcher = new MapReduceLauncher();
+        List<ExecJob> jobs = new ArrayList<ExecJob>();
 
-            MapReduceLauncher launcher = new MapReduceLauncher();
+        try {
             PigStats stats = launcher.launchPig(plan, jobName, pigContext);
-            if(stats != null)
-                return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats);
-            else
-                return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
 
+            for (FileSpec spec: launcher.getSucceededFiles()) {
+                jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats));
+            }
+
+            for (FileSpec spec: launcher.getFailedFiles()) {
+                HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, spec, stats);
+                j.setException(launcher.getError(spec));
+                jobs.add(j);
+            }
+
+            return jobs;
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher.  If this
             // is an ExecException, just let it through.  Else wrap it.
@@ -274,11 +284,13 @@
                 String msg = "Unexpected error during execution.";
                 throw new ExecException(msg, errCode, PigException.BUG, e);
             }
+        } finally {
+            launcher.reset();
         }
 
     }
 
-    public ExecJob submit(PhysicalPlan plan,
+    public List<ExecJob> submit(PhysicalPlan plan,
                           String jobName) throws ExecException {
         throw new UnsupportedOperationException();
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri May 15 17:14:35 2009
@@ -45,6 +45,7 @@
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
+    protected Exception backendException;
     private PigStats stats;
     
     public HJob(JOB_STATUS status,
@@ -161,4 +162,12 @@
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
+
+    public void setException(Exception e) {
+        backendException = e;
+    }
+
+    public Exception getException() {
+        return backendException;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri May 15 17:14:35 2009
@@ -108,13 +108,32 @@
 
     public static final String LOG_DIR = "_logs";
 
-    private List<Path> tmpPath;
-    private Path curTmpPath;
+    // A mapping of job to pair of store locations and tmp locations for that job
+    private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
         this.conf = conf;
-        tmpPath = new LinkedList<Path>();
+        jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+    }
+
+    /**
+     * Returns all store locations of a previously compiled job
+     */
+    public List<POStore> getStores(Job job) {
+        Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
+        if (pair != null && pair.first != null) {
+            return pair.first;
+        } else {
+            return new ArrayList<POStore>();
+        }
+    }
+
+    /**
+     * Resets the state
+     */
+    public void reset() {
+        jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
     }
 
     /**
@@ -126,26 +145,24 @@
      * This method should always be called after the job execution
      * completes.
      */
-    public void moveResults() throws IOException {
-        if (curTmpPath != null) {
-            tmpPath.add(curTmpPath);
-            curTmpPath = null;
-        }
-
-        for (Path tmp: tmpPath) {
-            Path abs = new Path(tmp, "abs");
-            Path rel = new Path(tmp, "rel");
-            FileSystem fs = tmp.getFileSystem(conf);
+    public void moveResults(List<Job> completedJobs) throws IOException {
+        for (Job job: completedJobs) {
+            Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
+            if (pair != null && pair.second != null) {
+                Path tmp = pair.second;
+                Path abs = new Path(tmp, "abs");
+                Path rel = new Path(tmp, "rel");
+                FileSystem fs = tmp.getFileSystem(conf);
 
-            if (fs.exists(abs)) {
-                moveResults(abs, abs.toUri().getPath(), fs);
-            }
-            
-            if (fs.exists(rel)) {        
-                moveResults(rel, rel.toUri().getPath()+"/", fs);
+                if (fs.exists(abs)) {
+                    moveResults(abs, abs.toUri().getPath(), fs);
+                }
+                
+                if (fs.exists(rel)) {        
+                    moveResults(rel, rel.toUri().getPath()+"/", fs);
+                }
             }
         }
-        tmpPath = new LinkedList<Path>();
     }
 
     /**
@@ -171,19 +188,16 @@
         return new Path(pathStr);
     }
 
-    private void makeTmpPath() throws IOException {
-        if (curTmpPath != null) {
-            tmpPath.add(curTmpPath);
-        }
-
+    private Path makeTmpPath() throws IOException {
+        Path tmpPath = null;
         for (int tries = 0;;) {
             try {
-                curTmpPath = 
+                tmpPath = 
                     new Path(FileLocalizer
                              .getTemporaryPath(null, pigContext).toString());
-                FileSystem fs = curTmpPath.getFileSystem(conf);
-                curTmpPath = curTmpPath.makeQualified(fs);
-                fs.mkdirs(curTmpPath);
+                FileSystem fs = tmpPath.getFileSystem(conf);
+                tmpPath = tmpPath.makeQualified(fs);
+                fs.mkdirs(tmpPath);
                 break;
             } catch (IOException ioe) {
                 if (++tries==100) {
@@ -191,6 +205,7 @@
                 }
             }
         }
+        return tmpPath;
     }
 
     /**
@@ -220,7 +235,7 @@
             List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
             roots.addAll(plan.getRoots());
             for (MapReduceOper mro: roots) {
-                jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+                jobCtrl.addJob(getJob(mro, conf, pigContext));
                 plan.remove(mro);
             }
         } catch (JobCreationException jce) {
@@ -235,7 +250,7 @@
     }
         
     /**
-     * The method that creates the JobConf corresponding to a MapReduceOper.
+     * The method that creates the Job corresponding to a MapReduceOper.
      * The assumption is that
      * every MapReduceOper will have a load and a store. The JobConf removes
      * the load operator and serializes the input filespec so that PigInputFormat can
@@ -253,13 +268,15 @@
      * @param mro - The MapReduceOper for which the JobConf is required
      * @param conf - the Configuration object from which JobConf is built
      * @param pigContext - The PigContext passed on from execution engine
-     * @return JobConf corresponding to mro
+     * @return Job corresponding to mro
      * @throws JobCreationException
      */
-    private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
+    private Job getJob(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
         JobConf jobConf = new JobConf(conf);
         ArrayList<Pair<FileSpec, Boolean>> inp = new ArrayList<Pair<FileSpec, Boolean>>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
+        ArrayList<POStore> storeLocations = new ArrayList<POStore>();
+        Path tmpLocation = null;
         
         //Set the User Name for this job. This will be
         //used as the working directory
@@ -320,6 +337,14 @@
             List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
             List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
 
+            for (POStore st: mapStores) {
+                storeLocations.add(st);
+            }
+
+            for (POStore st: reduceStores) {
+                storeLocations.add(st);
+            }
+
             if (mapStores.size() + reduceStores.size() == 1) { // single store case
                 log.info("Setting up single store job");
                 
@@ -370,21 +395,22 @@
            else { // multi store case
                 log.info("Setting up multi store job");
 
-                makeTmpPath();
-                FileSystem fs = curTmpPath.getFileSystem(conf);
+                tmpLocation = makeTmpPath();
+
+                FileSystem fs = tmpLocation.getFileSystem(conf);
                 for (POStore st: mapStores) {
                     Path tmpOut = new Path(
-                        curTmpPath,
+                        tmpLocation,
                         PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
                     fs.mkdirs(tmpOut);
                 }
 
                 jobConf.setOutputFormat(PigOutputFormat.class);
-                FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+                FileOutputFormat.setOutputPath(jobConf, tmpLocation);
 
                 jobConf.set("pig.streaming.log.dir", 
-                            new Path(curTmpPath, LOG_DIR).toString());
-                jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+                            new Path(tmpLocation, LOG_DIR).toString());
+                jobConf.set("pig.streaming.task.output.dir", tmpLocation.toString());
            }
 
             // store map key type
@@ -475,7 +501,10 @@
                         ObjectSerializer.serialize(mro.getSortOrder()));
                 }
             }
-            return jobConf;
+            
+            Job job = new Job(jobConf);
+            jobStoreMap.put(job,new Pair(storeLocations, tmpLocation));
+            return job;
         } catch (JobCreationException jce) {
         	throw jce;
         } catch(Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri May 15 17:14:35 2009
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -48,6 +49,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public abstract class Launcher {
@@ -58,6 +60,9 @@
     boolean pigException = false;
     boolean outOfMemory = false;
     final String OOM_ERR = "OutOfMemoryError";
+
+    protected List<FileSpec> succeededStores = null;
+    protected List<FileSpec> failedStores = null;
     
     protected Launcher(){
         totalHadoopTimeSpent = 0;
@@ -65,7 +70,36 @@
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
             newLine = "\r\n";
         }
+        reset();
+    }
+
+    /**
+     * Returns a list of locations of results that have been
+     * successfully completed.
+     * @return A list of filspecs that corresponds to the locations of
+     * the successful stores.
+     */
+    public List<FileSpec> getSucceededFiles() {
+        return succeededStores;
     }
+
+    /**
+     * Returns a list of locations of results that have failed.
+     * @return A list of filspecs that corresponds to the locations of
+     * the failed stores.
+     */
+    public List<FileSpec> getFailedFiles() {
+        return failedStores;
+    }
+
+    /**
+     * Resets the state after a launch
+     */
+    public void reset() {
+        succeededStores = new LinkedList<FileSpec>();
+        failedStores = new LinkedList<FileSpec>();
+    }
+
     /**
      * Method to launch pig for hadoop either for a cluster's
      * job tracker or for a local job runner. THe only difference
@@ -127,14 +161,14 @@
         JobID MRJobID = job.getAssignedJobID();
         String jobMessage = job.getMessage();
         if(MRJobID == null) {
-        	try {
+            try {
                 throw getExceptionFromString(jobMessage);
             } catch (Exception e) {
                 //just get the first line in the message and log the rest
                 String firstLine = getFirstLineFromMessage(jobMessage);
-
+                
                 LogUtils.writeLog(new Exception(jobMessage), pigContext.getProperties().getProperty("pig.logfile"), 
-                        log, false, null, false, false);
+                                  log, false, null, false, false);
                 int errCode = 2997;
                 String msg = "Unable to recreate exception from backend error: " + firstLine;
                 throw new ExecException(msg, errCode, PigException.BUG, e);
@@ -179,32 +213,32 @@
             	//this comparison is in place till Hadoop 0.20 provides methods to query
             	//job status            	
             	if(reports[i].getProgress() != successfulProgress) {
-            		jobFailed = true;
+                    jobFailed = true;
             	}
                 Set<String> errorMessageSet = new HashSet<String>();
                 for (int j = 0; j < msgs.length; j++) {                	
-	            	if(!errorMessageSet.contains(msgs[j])) {
-	            	    errorMessageSet.add(msgs[j]);
-		            	if (errNotDbg) {
-		            		//errNotDbg is used only for failed jobs
-		            	    //keep track of all the unique exceptions
+                    if(!errorMessageSet.contains(msgs[j])) {
+                        errorMessageSet.add(msgs[j]);
+                        if (errNotDbg) {
+                            //errNotDbg is used only for failed jobs
+                            //keep track of all the unique exceptions
                             try {
                                 Exception e = getExceptionFromString(msgs[j]);
                                 exceptions.add(e);
                             } catch (Exception e1) {
                                 String firstLine = getFirstLineFromMessage(msgs[j]);                                
                                 LogUtils.writeLog(new Exception(msgs[j]), pigContext.getProperties().getProperty("pig.logfile"), 
-                                        log, false, null, false, false);
+                                                  log, false, null, false, false);
                                 int errCode = 2997;
                                 String msg = "Unable to recreate exception from backed error: " + firstLine;
                                 throw new ExecException(msg, errCode, PigException.BUG, e1);
                             }
-		                } else {
-		                    log.debug("Error message from task (" + type + ") " +
-		                        reports[i].getTaskID() + msgs[j]);
-		                }
-	            	}
-	            }
+                        } else {
+                            log.debug("Error message from task (" + type + ") " +
+                                      reports[i].getTaskID() + msgs[j]);
+                        }
+                    }
+                }
             }
             
             //if its a failed job then check if there is more than one exception

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri May 15 17:14:35 2009
@@ -229,7 +229,6 @@
 
         List<POStore> stores = PlanHelper.getStores(plan);
         for (POStore store: stores) {
-            FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
             compile(store);
         }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri May 15 17:14:35 2009
@@ -50,12 +50,15 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 
 /**
@@ -68,16 +71,31 @@
     //used to track the exception thrown by the job control which is run in a separate thread
     private Exception jobControlException = null;
     private boolean aggregateWarning = false;
-    
+    private Map<FileSpec, Exception> failureMap;
+
+    /**
+     * Get the exception that caused a failure on the backend for a
+     * store location (if any).
+     */
+    public Exception getError(FileSpec spec) {
+        return failureMap.get(spec);
+    }
+
+    @Override
+    public void reset() {
+        failureMap = new HashMap<FileSpec, Exception>();
+        super.reset();
+    }
+
     @Override
     public PigStats launchPig(PhysicalPlan php,
-                             String grpName,
-                             PigContext pc) throws PlanException,
-                                                   VisitorException,
-                                                   IOException,
-                                                   ExecException,
-                                                   JobCreationException,
-                                                   Exception {
+                              String grpName,
+                              PigContext pc) throws PlanException,
+                                                    VisitorException,
+                                                    IOException,
+                                                    ExecException,
+                                                    JobCreationException,
+                                                    Exception {
         long sleepTime = 500;
         aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
@@ -140,8 +158,29 @@
 
             numMRJobsCompl += numMRJobsCurrent;
             failedJobs.addAll(jc.getFailedJobs());
-            succJobs.addAll(jc.getSuccessfulJobs());
-            jcc.moveResults();
+
+            if (!failedJobs.isEmpty() 
+                && "true".equalsIgnoreCase(
+                  pc.getProperties().getProperty("stop.on.failure","false"))) {
+                int errCode = 6017;
+                StringBuilder msg = new StringBuilder("Execution failed, while processing ");
+                
+                for (Job j: failedJobs) {
+                    List<POStore> sts = jcc.getStores(j);
+                    for (POStore st: sts) {
+                        msg.append(st.getSFile().getFileName());
+                        msg.append(", ");
+                    }
+                }
+                
+                throw new ExecException(msg.substring(0,msg.length()-2), 
+                                        errCode, PigException.REMOTE_ENVIRONMENT);
+            }
+
+            List<Job> jobs = jc.getSuccessfulJobs();
+            jcc.moveResults(jobs);
+            succJobs.addAll(jobs);
+            
             
             stats.setJobClient(jobClient);
             stats.setJobControl(jc);
@@ -150,40 +189,76 @@
             jc.stop(); 
         }
 
+        log.info( "100% complete");
+
+        boolean failed = false;
         // Look to see if any jobs failed.  If so, we need to report that.
         if (failedJobs != null && failedJobs.size() > 0) {
-            log.error("Map reduce job failed");
+            log.error(failedJobs.size()+" map reduce job(s) failed!");
+            Exception backendException = null;
+
             for (Job fj : failedJobs) {
-                getStats(fj, jobClient, true, pc);
+                
+                try {
+                    getStats(fj, jobClient, true, pc);
+                } catch (Exception e) {
+                    backendException = e;
+                }
+
+                List<POStore> sts = jcc.getStores(fj);
+                for (POStore st: sts) {
+                    if (!st.isTmpStore()) {
+                        failedStores.add(st.getSFile());
+                        failureMap.put(st.getSFile(), backendException);
+                    }
+
+                    FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
+                    log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
+                }
             }
-            return null;
+            failed = true;
         }
 
         Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
                 
         if(succJobs!=null) {
             for(Job job : succJobs){
+                List<POStore> sts = jcc.getStores(job);
+                for (POStore st: sts) {
+                    if (!st.isTmpStore()) {
+                        succeededStores.add(st.getSFile());
+                    }
+                    log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
+                }
                 getStats(job,jobClient, false, pc);
                 if(aggregateWarning) {
-                	computeWarningAggregate(job, jobClient, warningAggMap);
+                    computeWarningAggregate(job, jobClient, warningAggMap);
                 }
             }
         }
         
         if(aggregateWarning) {
-        	CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
+            CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
         }
         
         if(stats.getPigStats().get(stats.getLastJobID()) == null)
-            log
-                    .warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
+            log.warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
         else {
             log.info("Records written : " + stats.getRecordsWritten());
             log.info("Bytes written : " + stats.getBytesWritten());
         }
 
-        log.info( "100% complete");
-        log.info("Success!");
+        if (!failed) {
+            log.info("Success!");
+        } else {
+            if (succJobs != null && succJobs.size() > 0) {
+                log.info("Some jobs have failed!");
+            } else {
+                log.info("Failed!");
+            }
+        }
+        jcc.reset();
+
         return stats;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Fri May 15 17:14:35 2009
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.Properties;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -143,31 +144,37 @@
         }
     }
 
-    public ExecJob execute(PhysicalPlan plan, String jobName)
-            throws ExecException {
+    public List<ExecJob> execute(PhysicalPlan plan, String jobName)
+        throws ExecException {
         try {
             PhysicalOperator leaf = (PhysicalOperator) plan.getLeaves().get(0);
-            FileSpec spec = null;
             if (!(leaf instanceof POStore)) {
                 String scope = leaf.getOperatorKey().getScope();
                 POStore str = new POStore(new OperatorKey(scope,
                         NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
-                        pigContext).toString(), new FuncSpec(BinStorage.class
-                        .getName()));
+                FileSpec spec = new FileSpec(FileLocalizer.getTemporaryPath(null,pigContext).toString(), 
+                                             new FuncSpec(BinStorage.class.getName()));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
-            } else {
-                spec = ((POStore) leaf).getSFile();
             }
 
             LocalPigLauncher launcher = new LocalPigLauncher();
+
+            List<ExecJob> jobs = new ArrayList<ExecJob>();
+                    
             PigStats stats = launcher.launchPig(plan, jobName, pigContext);
-            if (stats != null)
-                return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
-                        spec, stats);
-            else
-                return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
+            for (FileSpec fspec: launcher.getSucceededFiles()) {
+                jobs.add(new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, fspec, stats));
+            }
+            
+            for (FileSpec fspec: launcher.getFailedFiles()) {
+                jobs.add(new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, fspec, stats));
+            }
+
+            launcher.reset();
+
+            return jobs;
+
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher. If this
             // is an ExecException, just let it through. Else wrap it.
@@ -178,7 +185,7 @@
         }
     }
 
-    public LocalJob submit(PhysicalPlan plan, String jobName)
+    public List<ExecJob> submit(PhysicalPlan plan, String jobName)
             throws ExecException {
         throw new UnsupportedOperationException();
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java Fri May 15 17:14:35 2009
@@ -158,4 +158,8 @@
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
+
+    public Exception getException() {
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Fri May 15 17:14:35 2009
@@ -39,9 +39,11 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.PigException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
@@ -95,7 +97,7 @@
                 log.info("running store with dependencies");
                 POStore[] st = new POStore[1];
                 st[0] = op;
-                failedJobs += runPipeline(st);
+                failedJobs += runPipeline(st, pc);
                 for (PhysicalOperator suc: sucs) {
                     php.disconnect(op, suc);
                 }
@@ -104,13 +106,21 @@
         }
                 
         // The remaining stores can be run together.
-        failedJobs += runPipeline(stores.toArray(new POStore[0]));
+        failedJobs += runPipeline(stores.toArray(new POStore[0]), pc);
         
         stats.accumulateStats();
 
         UDFFinishVisitor finisher = new UDFFinishVisitor(php, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(php));
         finisher.visit();
 
+        for (FileSpec spec: failedStores) {
+            log.info("Failed to produce result in: \""+spec.getFileName()+"\"");
+        }
+
+        for (FileSpec spec: succeededStores) {
+            log.info("Successfully stored result in: \""+spec.getFileName()+"\"");
+        }
+
         if (failedJobs == 0) {
             log.info("Records written : " + stats.getRecordsWritten());
             log.info("Bytes written : " + stats.getBytesWritten());
@@ -124,9 +134,8 @@
         return null;
 
     }
-    
 
-    private int runPipeline(POStore[] leaves) throws IOException, ExecException {
+    private int runPipeline(POStore[] leaves, PigContext pc) throws IOException, ExecException {
         BitSet bs = new BitSet(leaves.length);
         int failed = 0;
         while(true) {
@@ -146,9 +155,20 @@
                     leaves[i].cleanUp();
                     leaves[i].tearDown();
                     failed++;
-                    // fallthrough
+                    failedStores.add(leaves[i].getSFile());
+                    if ("true".equalsIgnoreCase(
+                        pc.getProperties().getProperty("stop.on.failure","false"))) {
+                        int errCode = 6017;
+                        String msg = "Execution failed, while processing "
+                            + leaves[i].getSFile().getFileName();
+                        
+                        throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT);
+                    }
+                    bs.set(i);
+                    break;
                 case POStatus.STATUS_EOP:
                     leaves[i].tearDown();
+                    succeededStores.add(leaves[i].getSFile());
                     // fallthrough
                 default:
                     bs.set(i);

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java Fri May 15 17:14:35 2009
@@ -81,11 +81,11 @@
         }
     }
 
-    public void exec() throws Throwable {
+    public int[] exec() throws Throwable {
         boolean verbose = "true".equalsIgnoreCase(pig.getPigContext().getProperties().getProperty("verbose"));
         try {
             parser.setInteractive(false);
-            parser.parseStopOnError();
+            return parser.parseStopOnError();
         } catch (Throwable t) {
             LogUtils.writeLog(t, pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
             throw (t);

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri May 15 17:14:35 2009
@@ -58,6 +58,8 @@
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.tools.pigscript.parser.ParseException;
@@ -107,7 +109,22 @@
             }
 
             if (!mLoadOnly) {
-                mPigServer.executeBatch();
+                List<ExecJob> jobs = mPigServer.executeBatch();
+                for(ExecJob job: jobs) {
+                    if (job.getStatus() == ExecJob.JOB_STATUS.FAILED) {
+                        mNumFailedJobs++;
+                        if (job.getException() != null) {
+                            LogUtils.writeLog(
+                              job.getException(), 
+                              mPigServer.getPigContext().getProperties().getProperty("pig.logfile"), 
+                              log, 
+                              "true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")));
+                        }
+                    }
+                    else {
+                        mNumSucceededJobs++;
+                    }
+                }
             }
         }
     }
@@ -118,9 +135,9 @@
         }
     }
 
-    public void parseStopOnError() throws IOException, ParseException
+    public int[] parseStopOnError() throws IOException, ParseException
     {
-	parseStopOnError(false);
+	return parseStopOnError(false);
     }
     
     /** 
@@ -130,7 +147,7 @@
      *
      * @throws IOException, ParseException
      */
-    public void parseStopOnError(boolean sameBatch) throws IOException, ParseException
+    public int[] parseStopOnError(boolean sameBatch) throws IOException, ParseException
     {
         if (mPigServer == null) {
             throw new IllegalStateException();
@@ -156,6 +173,8 @@
 		discardBatch();
 	    }
         }
+        int [] res = { mNumSucceededJobs, mNumFailedJobs };
+        return res;
     }
 
     public void setLoadOnly(boolean loadOnly) 
@@ -733,4 +752,6 @@
     private boolean mDone;
     private boolean mLoadOnly;
     private ExplainState mExplain;
+    private int mNumFailedJobs;
+    private int mNumSucceededJobs;
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Fri May 15 17:14:35 2009
@@ -589,4 +589,100 @@
     
         grunt.exec();
     }
+
+    @Test
+    public void testKeepGoing() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+        
+        String strCmd = 
+            "rmf bar;"
+            +"rmf foo;"
+            +"rmf baz;"
+            +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+            +"B = foreach A generate 1;"
+            +"C = foreach A generate 0/0;"
+            +"store B into 'foo';"
+            +"store C into 'bar';"
+            +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+            +"B = stream A through `false`;"
+            +"store B into 'baz';"
+            +"cat bar;";
+            
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+    
+        grunt.exec();
+    }
+
+    @Test
+    public void testKeepGoigFailed() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+        
+        String strCmd = 
+            "rmf bar;"
+            +"rmf foo;"
+            +"rmf baz;"
+            +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+            +"B = foreach A generate 1;"
+            +"C = foreach A generate 0/0;"
+            +"store B into 'foo';"
+            +"store C into 'bar';"
+            +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+            +"B = stream A through `false`;"
+            +"store B into 'baz';"
+            +"cat baz;";
+            
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        boolean caught = false;
+        try {
+            grunt.exec();
+        } catch (Exception e) {
+            caught = true;
+            assertTrue(e.getMessage().contains("baz does not exist"));
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testStopOnFailure() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+        context.getProperties().setProperty("stop.on.failure", ""+true);
+        
+        String strCmd = 
+            "rmf bar;\n"
+            +"rmf foo;\n"
+            +"rmf baz;\n"
+            +"copyFromLocal test/org/apache/pig/test/data/passwd pre;\n"
+            +"A = load 'file:test/org/apache/pig/test/data/passwd';\n"
+            +"B = stream A through `false`;\n"
+            +"store B into 'bar' using BinStorage();\n"
+            +"A = load 'bar';\n"
+            +"store A into 'foo';\n"
+            +"cp pre done;\n";
+            
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        boolean caught = false;
+        try {
+            grunt.exec();
+        } catch (PigException e) {
+            caught = true;
+            assertTrue(e.getErrorCode() == 6017);
+        }
+
+        assertFalse(server.existsFile("done"));
+        assertTrue(caught);
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri May 15 17:14:35 2009
@@ -22,6 +22,7 @@
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -30,6 +31,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.util.ExecTools;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -116,14 +118,15 @@
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
         } 
     }
-    
+
     @Test
     public void testMultiQueryWithTwoLoads2() {
 
@@ -1665,5 +1668,4 @@
             Assert.fail();
         }
     }
-
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Fri May 15 17:14:35 2009
@@ -20,6 +20,7 @@
 import java.io.StringReader;
 import java.io.IOException;
 import java.io.File;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Properties;
@@ -550,11 +551,15 @@
     }
 
     private boolean executePlan(PhysicalPlan pp) throws IOException {
+        boolean failed = true;
         FileLocalizer.clearDeleteOnFail();
-        ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp, "execute");
-        boolean failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED);
-        if (failed) {
-            FileLocalizer.triggerDeleteOnFail();
+        List<ExecJob> jobs = myPig.getPigContext().getExecutionEngine().execute(pp, "execute");
+        for (ExecJob job: jobs) {
+            failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED);
+            if (failed) {
+                FileLocalizer.triggerDeleteOnFail();
+                break;
+            }
         }
         return !failed;
     }