You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/06 01:47:25 UTC
svn commit: r833266 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Author: daijy
Date: Fri Nov 6 00:47:25 2009
New Revision: 833266
URL: http://svn.apache.org/viewvc?rev=833266&view=rev
Log:
PIG-1001: Generate more meaningful error message when one input file does not exist
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Nov 6 00:47:25 2009
@@ -173,6 +173,8 @@
PIG-790: Error message should indicate in which line number in the Pig script the error occured (debugging BinCond) (daijy)
+PIG-1001: Generate more meaningful error message when one input file does not exist (daijy)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Nov 6 00:47:25 2009
@@ -115,11 +115,14 @@
// A mapping of job to pair of store locations and tmp locations for that job
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
+
+ private Map<Job, MapReduceOper> jobMroMap;
public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
this.conf = conf;
jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+ jobMroMap = new HashMap<Job, MapReduceOper>();
}
/**
@@ -139,6 +142,7 @@
*/
public void reset() {
jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+ jobMroMap = new HashMap<Job, MapReduceOper>();
}
/**
@@ -237,8 +241,9 @@
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
roots.addAll(plan.getRoots());
for (MapReduceOper mro: roots) {
- jobCtrl.addJob(getJob(mro, conf, pigContext));
- plan.remove(mro);
+ Job job = getJob(mro, conf, pigContext);
+ jobMroMap.put(job, mro);
+ jobCtrl.addJob(job);
}
} catch (JobCreationException jce) {
throw jce;
@@ -250,6 +255,34 @@
return jobCtrl;
}
+
+ // Update Map-Reduce plan with the execution status of the jobs. If one job
+ // completely fail (the job has only one store and that job fail), then we
+ // remove all its dependent jobs. This method will return the number of MapReduceOper
+ // removed from the Map-Reduce plan
+ public int updateMROpPlan(List<Job> completeFailedJobs)
+ {
+ int sizeBefore = plan.size();
+ for (Job job : completeFailedJobs) // remove all subsequent jobs
+ {
+ MapReduceOper mrOper = jobMroMap.get(job);
+ plan.trimBelow(mrOper);
+ plan.remove(mrOper);
+ }
+
+ // Remove successful jobs from jobMroMap
+ for (Job job : jobMroMap.keySet())
+ {
+ if (!completeFailedJobs.contains(job))
+ {
+ MapReduceOper mro = jobMroMap.get(job);
+ plan.remove(mro);
+ }
+ }
+ jobMroMap.clear();
+ int sizeAfter = plan.size();
+ return sizeBefore-sizeAfter;
+ }
/**
* The method that creates the Job corresponding to a MapReduceOper.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=833266&r1=833265&r2=833266&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Nov 6 00:47:25 2009
@@ -114,11 +114,11 @@
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
List<Job> failedJobs = new LinkedList<Job>();
+ List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
List<Job> succJobs = new LinkedList<Job>();
JobControl jc;
int totalMRJobs = mrp.size();
int numMRJobsCompl = 0;
- int numMRJobsCurrent = 0;
double lastProg = -1;
//create the exception handler for the job control thread
@@ -128,7 +128,7 @@
while((jc = jcc.compile(mrp, grpName)) != null) {
List<Job> waitingJobs = jc.getWaitingJobs();
- numMRJobsCurrent = waitingJobs.size();
+ completeFailedJobsInThisRun.clear();
Thread jcThread = new Thread(jc);
jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
@@ -181,40 +181,50 @@
//if the job controller fails before launching the jobs then there are
//no jobs to check for failure
if(jobControlException != null) {
- if(jobControlException instanceof PigException) {
- if(jobControlExceptionStackTrace != null) {
- LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace,
- pc.getProperties().getProperty("pig.logfile"),
- log);
- }
- throw jobControlException;
- } else {
- int errCode = 2117;
- String msg = "Unexpected error when launching map reduce job.";
- throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
- }
+ if(jobControlException instanceof PigException) {
+ if(jobControlExceptionStackTrace != null) {
+ LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace,
+ pc.getProperties().getProperty("pig.logfile"),
+ log);
+ }
+ throw jobControlException;
+ } else {
+ int errCode = 2117;
+ String msg = "Unexpected error when launching map reduce job.";
+ throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+ }
}
- numMRJobsCompl += numMRJobsCurrent;
- failedJobs.addAll(jc.getFailedJobs());
-
- if (!failedJobs.isEmpty()
- && "true".equalsIgnoreCase(
+ if (!jc.getFailedJobs().isEmpty() )
+ {
+ if ("true".equalsIgnoreCase(
pc.getProperties().getProperty("stop.on.failure","false"))) {
- int errCode = 6017;
- StringBuilder msg = new StringBuilder("Execution failed, while processing ");
-
- for (Job j: failedJobs) {
- List<POStore> sts = jcc.getStores(j);
- for (POStore st: sts) {
- msg.append(st.getSFile().getFileName());
- msg.append(", ");
+ int errCode = 6017;
+ StringBuilder msg = new StringBuilder();
+
+ for (int i=0;i<jc.getFailedJobs().size();i++) {
+ Job j = jc.getFailedJobs().get(i);
+ msg.append(getFirstLineFromMessage(j.getMessage()));
+ if (i!=jc.getFailedJobs().size()-1)
+ msg.append("\n");
}
+
+ throw new ExecException(msg.toString(),
+ errCode, PigException.REMOTE_ENVIRONMENT);
}
-
- throw new ExecException(msg.substring(0,msg.length()-2),
- errCode, PigException.REMOTE_ENVIRONMENT);
+ // If we only have one store and that job fail, then we sure that the job completely fail, and we shall stop dependent jobs
+ for (Job job : jc.getFailedJobs())
+ {
+ List<POStore> sts = jcc.getStores(job);
+ if (sts.size()==1)
+ completeFailedJobsInThisRun.add(job);
+ }
+ failedJobs.addAll(jc.getFailedJobs());
}
+
+ int removedMROp = jcc.updateMROpPlan(completeFailedJobsInThisRun);
+
+ numMRJobsCompl += removedMROp;
List<Job> jobs = jc.getSuccessfulJobs();
jcc.moveResults(jobs);
@@ -248,13 +258,13 @@
List<POStore> sts = jcc.getStores(fj);
for (POStore st: sts) {
if (!st.isTmpStore()) {
- failedStores.add(st.getSFile());
- failureMap.put(st.getSFile(), backendException);
finalStores++;
+ log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
}
-
+ failedStores.add(st.getSFile());
+ failureMap.put(st.getSFile(), backendException);
FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
- log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
+ //log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
}
}
failed = true;
@@ -269,8 +279,10 @@
if (!st.isTmpStore()) {
succeededStores.add(st.getSFile());
finalStores++;
+ log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
}
- log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
+ else
+ log.debug("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
}
getStats(job,jobClient, false, pc);
if(aggregateWarning) {
@@ -303,7 +315,7 @@
log.info("Success!");
} else {
if (succJobs != null && succJobs.size() > 0) {
- log.info("Some jobs have failed!");
+ log.info("Some jobs have failed! Stop running all dependent jobs");
} else {
log.info("Failed!");
}