You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/06 01:47:25 UTC

svn commit: r833266 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java

Author: daijy
Date: Fri Nov  6 00:47:25 2009
New Revision: 833266

URL: http://svn.apache.org/viewvc?rev=833266&view=rev
Log:
PIG-1001: Generate more meaningful error message when one input file does not exist

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Nov  6 00:47:25 2009
@@ -173,6 +173,8 @@
 
 PIG-790: Error message should indicate in which line number in the Pig script the error occured (debugging BinCond) (daijy)
 
+PIG-1001: Generate more meaningful error message when one input file does not exist (daijy)
+
 Release 0.5.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Nov  6 00:47:25 2009
@@ -115,11 +115,14 @@
     
     // A mapping of job to pair of store locations and tmp locations for that job
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
+    
+    private Map<Job, MapReduceOper> jobMroMap;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
         this.conf = conf;
         jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+        jobMroMap = new HashMap<Job, MapReduceOper>();
     }
 
     /**
@@ -139,6 +142,7 @@
      */
     public void reset() {
         jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+        jobMroMap = new HashMap<Job, MapReduceOper>();
     }
 
     /**
@@ -237,8 +241,9 @@
             List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
             roots.addAll(plan.getRoots());
             for (MapReduceOper mro: roots) {
-                jobCtrl.addJob(getJob(mro, conf, pigContext));
-                plan.remove(mro);
+                Job job = getJob(mro, conf, pigContext);
+                jobMroMap.put(job, mro);
+                jobCtrl.addJob(job);
             }
         } catch (JobCreationException jce) {
         	throw jce;
@@ -250,6 +255,34 @@
 
         return jobCtrl;
     }
+    
+    // Update Map-Reduce plan with the execution status of the jobs. If one job
+    // completely fail (the job has only one store and that job fail), then we 
+    // remove all its dependent jobs. This method will return the number of MapReduceOper
+    // removed from the Map-Reduce plan
+    public int updateMROpPlan(List<Job> completeFailedJobs)
+    {
+        int sizeBefore = plan.size();
+        for (Job job : completeFailedJobs)  // remove all subsequent jobs
+        {
+            MapReduceOper mrOper = jobMroMap.get(job); 
+            plan.trimBelow(mrOper);
+            plan.remove(mrOper);
+        }
+
+        // Remove successful jobs from jobMroMap
+        for (Job job : jobMroMap.keySet())
+        {
+            if (!completeFailedJobs.contains(job))
+            {
+                MapReduceOper mro = jobMroMap.get(job);
+                plan.remove(mro);
+            }
+        }
+        jobMroMap.clear();
+        int sizeAfter = plan.size();
+        return sizeBefore-sizeAfter;
+    }
         
     /**
      * The method that creates the Job corresponding to a MapReduceOper.

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Nov  6 00:47:25 2009
@@ -114,11 +114,11 @@
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
         List<Job> failedJobs = new LinkedList<Job>();
+        List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
         List<Job> succJobs = new LinkedList<Job>();
         JobControl jc;
         int totalMRJobs = mrp.size();
         int numMRJobsCompl = 0;
-        int numMRJobsCurrent = 0;
         double lastProg = -1;
         
         //create the exception handler for the job control thread
@@ -128,7 +128,7 @@
         while((jc = jcc.compile(mrp, grpName)) != null) {
             
             List<Job> waitingJobs = jc.getWaitingJobs();
-            numMRJobsCurrent = waitingJobs.size();
+            completeFailedJobsInThisRun.clear();
             
             Thread jcThread = new Thread(jc);
             jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
@@ -181,40 +181,50 @@
             //if the job controller fails before launching the jobs then there are
             //no jobs to check for failure
             if(jobControlException != null) {
-        	if(jobControlException instanceof PigException) {
-        	        if(jobControlExceptionStackTrace != null) {
-        	            LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace, 
-        	                    pc.getProperties().getProperty("pig.logfile"), 
-                                log);
-        	        }
-                    throw jobControlException;
-        	} else {
-                    int errCode = 2117;
-                    String msg = "Unexpected error when launching map reduce job.";        	
-                    throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
-        	}
+            	if(jobControlException instanceof PigException) {
+            	        if(jobControlExceptionStackTrace != null) {
+            	            LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace, 
+            	                    pc.getProperties().getProperty("pig.logfile"), 
+                                    log);
+            	        }
+                        throw jobControlException;
+            	} else {
+                        int errCode = 2117;
+                        String msg = "Unexpected error when launching map reduce job.";        	
+                        throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+            	}
             }
 
-            numMRJobsCompl += numMRJobsCurrent;
-            failedJobs.addAll(jc.getFailedJobs());
-
-            if (!failedJobs.isEmpty() 
-                && "true".equalsIgnoreCase(
+            if (!jc.getFailedJobs().isEmpty() )
+            {
+                if ("true".equalsIgnoreCase(
                   pc.getProperties().getProperty("stop.on.failure","false"))) {
-                int errCode = 6017;
-                StringBuilder msg = new StringBuilder("Execution failed, while processing ");
-                
-                for (Job j: failedJobs) {
-                    List<POStore> sts = jcc.getStores(j);
-                    for (POStore st: sts) {
-                        msg.append(st.getSFile().getFileName());
-                        msg.append(", ");
+                    int errCode = 6017;
+                    StringBuilder msg = new StringBuilder();
+                    
+                    for (int i=0;i<jc.getFailedJobs().size();i++) {
+                        Job j = jc.getFailedJobs().get(i);
+                        msg.append(getFirstLineFromMessage(j.getMessage()));
+                        if (i!=jc.getFailedJobs().size()-1)
+                            msg.append("\n");
                     }
+                    
+                    throw new ExecException(msg.toString(), 
+                                            errCode, PigException.REMOTE_ENVIRONMENT);
                 }
-                
-                throw new ExecException(msg.substring(0,msg.length()-2), 
-                                        errCode, PigException.REMOTE_ENVIRONMENT);
+                // If we only have one store and that job fail, then we sure that the job completely fail, and we shall stop dependent jobs
+                for (Job job : jc.getFailedJobs())
+                {
+                    List<POStore> sts = jcc.getStores(job);
+                    if (sts.size()==1)
+                        completeFailedJobsInThisRun.add(job);
+                }
+                failedJobs.addAll(jc.getFailedJobs());
             }
+            
+            int removedMROp = jcc.updateMROpPlan(completeFailedJobsInThisRun);
+            
+            numMRJobsCompl += removedMROp;
 
             List<Job> jobs = jc.getSuccessfulJobs();
             jcc.moveResults(jobs);
@@ -248,13 +258,13 @@
                 List<POStore> sts = jcc.getStores(fj);
                 for (POStore st: sts) {
                     if (!st.isTmpStore()) {
-                        failedStores.add(st.getSFile());
-                        failureMap.put(st.getSFile(), backendException);
                         finalStores++;
+                        log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
                     }
-
+                    failedStores.add(st.getSFile());
+                    failureMap.put(st.getSFile(), backendException);
                     FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
-                    log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
+                    //log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
                 }
             }
             failed = true;
@@ -269,8 +279,10 @@
                     if (!st.isTmpStore()) {
                         succeededStores.add(st.getSFile());
                         finalStores++;
+                        log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
                     }
-                    log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
+                    else
+                        log.debug("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
                 }
                 getStats(job,jobClient, false, pc);
                 if(aggregateWarning) {
@@ -303,7 +315,7 @@
             log.info("Success!");
         } else {
             if (succJobs != null && succJobs.size() > 0) {
-                log.info("Some jobs have failed!");
+                log.info("Some jobs have failed! Stop running all dependent jobs");
             } else {
                 log.info("Failed!");
             }