You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/15 19:14:35 UTC
svn commit: r775239 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backe...
Author: olga
Date: Fri May 15 17:14:35 2009
New Revision: 775239
URL: http://svn.apache.org/viewvc?rev=775239&view=rev
Log:
PIG-781: Error reporting for failed MR jobs (hagleitn via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 15 17:14:35 2009
@@ -76,6 +76,8 @@
PIG-799: Unit tests on windows are failing after multiquery commit (daijy)
+PIG-781: Error reporting for failed MR jobs (hagleitn via olgan)
+
Release 0.2.0
INCOMPATIBLE CHANGES
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri May 15 17:14:35 2009
@@ -99,12 +99,13 @@
opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
- opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
ExecMode mode = ExecMode.UNKNOWN;
String file = null;
@@ -122,8 +123,12 @@
//by default warning aggregation is on
properties.setProperty("aggregate.warning", ""+true);
+ //by default multiquery optimization is on
properties.setProperty("opt.multiquery", ""+true);
+ //by default we keep going on error on the backend
+ properties.setProperty("stop.on.failure", ""+false);
+
char opt;
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
switch (opt) {
@@ -164,6 +169,10 @@
file = opts.getValStr();
break;
+ case 'F':
+ properties.setProperty("stop.on.failure", ""+true);
+ break;
+
case 'h':
usage();
return;
@@ -359,8 +368,9 @@
logFileName = validateLogFile(logFileName, remainders[0]);
pigContext.getProperties().setProperty("pig.logfile", logFileName);
- if (!debug)
+ if (!debug) {
new File(substFile).deleteOnExit();
+ }
// Set job name based on name of the script
pigContext.getProperties().setProperty(PigContext.JOB_NAME,
@@ -369,8 +379,21 @@
grunt = new Grunt(pin, pigContext);
gruntCalled = true;
- grunt.exec();
- rc = 0;
+ int[] results = grunt.exec();
+ if (results[1] == 0) {
+ // no failed jobs
+ rc = 0;
+ }
+ else {
+ if (results[0] == 0) {
+ // no succeeded jobs
+ rc = 2;
+ }
+ else {
+ // some jobs have failed
+ rc = 3;
+ }
+ }
return;
}
@@ -387,6 +410,7 @@
} else {
rc = 2;
}
+
if(!gruntCalled) {
LogUtils.writeLog(pe, logFileName, log, verbose);
}
@@ -495,30 +519,31 @@
public static void usage()
{
System.out.println("\n"+getVersionString()+"\n");
- System.out.println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
- System.out.println(" Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
- System.out.println(" Pig [options] [-f[ile]] file : Run cmds found in file.");
- System.out.println(" options include:");
- System.out.println(" -4, -log4jconf log4j configuration file, overrides log conf");
- System.out.println(" -b, -brief brief logging (no timestamps)");
- System.out.println(" -c, -cluster clustername, kryptonite is default");
- System.out.println(" -d, -debug debug level, INFO is default");
- System.out.println(" -e, -execute commands to execute (within quotes)");
- System.out.println(" -f, -file path to the script to execute");
- System.out.println(" -h, -help display this message");
- System.out.println(" -i, -version display version information");
- System.out.println(" -j, -jar jarfile load jarfile");
- System.out.println(" -l, -logfile path to client side log file; current working directory is default");
- System.out.println(" -m, -param_file path to the parameter file");
- System.out.println(" -o, -hod read hod server from system property ssh.gateway");
- System.out.println(" -p, -param key value pair of the form param=val");
- System.out.println(" -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
- System.out.println(" -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
- System.out.println(" -v, -verbose print all error messages to screen");
- System.out.println(" -w, -warning turn warning on; also turns warning aggregation off");
- System.out.println(" -x, -exectype local|mapreduce, mapreduce is default");
+ System.out.println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
+ System.out.println(" Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
+ System.out.println(" Pig [options] [-f[ile]] file : Run cmds found in file.");
+ System.out.println(" options include:");
+ System.out.println(" -4, -log4jconf log4j configuration file, overrides log conf");
+ System.out.println(" -b, -brief brief logging (no timestamps)");
+ System.out.println(" -c, -cluster clustername, kryptonite is default");
+ System.out.println(" -d, -debug debug level, INFO is default");
+ System.out.println(" -e, -execute commands to execute (within quotes)");
+ System.out.println(" -f, -file path to the script to execute");
+ System.out.println(" -h, -help display this message");
+ System.out.println(" -i, -version display version information");
+ System.out.println(" -j, -jar jarfile load jarfile");
+ System.out.println(" -l, -logfile path to client side log file; current working directory is default");
+ System.out.println(" -m, -param_file path to the parameter file");
+ System.out.println(" -o, -hod read hod server from system property ssh.gateway");
+ System.out.println(" -p, -param key value pair of the form param=val");
+ System.out.println(" -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
+ System.out.println(" -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
+ System.out.println(" -v, -verbose print all error messages to screen");
+ System.out.println(" -w, -warning turn warning on; also turns warning aggregation off");
+ System.out.println(" -x, -exectype local|mapreduce, mapreduce is default");
- System.out.println(" -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
+ System.out.println(" -F, -stop_on_failure aborts execution on the first failed job; off by default");
+ System.out.println(" -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
}
private static String validateLogFile(String logFileName, String scriptName) {
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri May 15 17:14:35 2009
@@ -24,6 +24,7 @@
import java.io.PrintStream;
import java.net.URL;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.Collection;
import java.util.Date;
import java.util.Enumeration;
@@ -227,10 +228,10 @@
* @throws FrontendException
* @throws ExecException
*/
- public void executeBatch() throws FrontendException, ExecException {
+ public List<ExecJob> executeBatch() throws FrontendException, ExecException {
if (!isMultiQuery) {
// ignore if multiquery is off
- return;
+ return new LinkedList<ExecJob>();
}
if (currDAG == null || !isBatchOn()) {
@@ -239,7 +240,7 @@
throw new FrontendException(msg, errCode, PigException.INPUT);
}
- currDAG.execute();
+ return currDAG.execute();
}
/**
@@ -453,7 +454,11 @@
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
- return job.getResults();
+ return job.getResults();
+ } else if (job.getStatus() == JOB_STATUS.FAILED
+ && job.getException() != null) {
+ // throw the backend exception in the failed case
+ throw job.getException();
} else {
throw new IOException("Job terminated with anomalous status "
+ job.getStatus().toString());
@@ -484,8 +489,9 @@
String filename,
String func) throws IOException {
- if (!currDAG.getAliasOp().containsKey(id))
+ if (!currDAG.getAliasOp().containsKey(id)) {
throw new IOException("Invalid alias: " + id);
+ }
try {
LogicalPlan lp = compileLp(id);
@@ -507,7 +513,11 @@
}
LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf);
- return executeCompiledLogicalPlan(storePlan);
+ List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
+ if (jobs.size() < 1) {
+ throw new IOException("Couldn't retrieve job.");
+ }
+ return jobs.get(0);
} catch (Exception e) {
int errCode = 1002;
String msg = "Unable to store alias " + id;
@@ -734,30 +744,34 @@
return lp;
}
- private ExecJob execute(String alias) throws FrontendException, ExecException {
+ private List<ExecJob> execute(String alias) throws FrontendException, ExecException {
LogicalPlan typeCheckedLp = compileLp(alias);
if (typeCheckedLp.size() == 0) {
- return null;
+ return new LinkedList<ExecJob>();
}
LogicalOperator op = typeCheckedLp.getLeaves().get(0);
if (op instanceof LODefine) {
log.info("Skip execution of DEFINE only logical plan.");
- return null;
+ return new LinkedList<ExecJob>();
}
return executeCompiledLogicalPlan(typeCheckedLp);
}
- private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
+ private List<ExecJob> executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
PhysicalPlan pp = compilePp(compiledLp);
// execute using appropriate engine
FileLocalizer.clearDeleteOnFail();
- ExecJob execJob = pigContext.getExecutionEngine().execute(pp, "execute");
- if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED)
- FileLocalizer.triggerDeleteOnFail();
- return execJob;
+ List<ExecJob> execJobs = pigContext.getExecutionEngine().execute(pp, "execute");
+ for (ExecJob execJob: execJobs) {
+ if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED) {
+ FileLocalizer.triggerDeleteOnFail();
+ break;
+ }
+ }
+ return execJobs;
}
private LogicalPlan compileLp(
@@ -912,10 +926,11 @@
boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
- void execute() throws ExecException, FrontendException {
+ List<ExecJob> execute() throws ExecException, FrontendException {
pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
- PigServer.this.execute(null);
+ List<ExecJob> jobs = PigServer.this.execute(null);
processedStores = storeOpTable.keySet().size();
+ return jobs;
}
void markAsExecuted() {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Fri May 15 17:14:35 2009
@@ -101,4 +101,9 @@
public void getSTDOut(OutputStream out) throws ExecException;
public void getSTDError(OutputStream error) throws ExecException;
+
+ /**
+ * Get exceptions that happened during execution
+ */
+ public Exception getException();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri May 15 17:14:35 2009
@@ -19,6 +19,7 @@
package org.apache.pig.backend.executionengine;
import java.io.PrintStream;
+import java.util.List;
import java.util.Collection;
import java.util.Properties;
import java.util.Map;
@@ -105,8 +106,8 @@
* @param jobName Name of this plan, will be used to identify the plan
* @throws ExecException
*/
- public ExecJob execute(PhysicalPlan plan,
- String jobName) throws ExecException;
+ public List<ExecJob> execute(PhysicalPlan plan,
+ String jobName) throws ExecException;
/**
* Execute the physical plan in non-blocking mode
@@ -115,8 +116,8 @@
* @param jobName Name of this plan, will be used to identify the plan
* @throws ExecException
*/
- public ExecJob submit(PhysicalPlan plan,
- String jobName) throws ExecException;
+ public List<ExecJob> submit(PhysicalPlan plan,
+ String jobName) throws ExecException;
/**
* Explain executor specific information.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri May 15 17:14:35 2009
@@ -34,6 +34,9 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@@ -72,9 +75,9 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
-
public class HExecutionEngine implements ExecutionEngine {
private static final String HOD_SERVER = "hod.server";
@@ -253,18 +256,25 @@
}
}
- public ExecJob execute(PhysicalPlan plan,
- String jobName) throws ExecException {
- try {
- FileSpec spec = ExecTools.checkLeafIsStore(plan, pigContext);
+ public List<ExecJob> execute(PhysicalPlan plan,
+ String jobName) throws ExecException {
+ MapReduceLauncher launcher = new MapReduceLauncher();
+ List<ExecJob> jobs = new ArrayList<ExecJob>();
- MapReduceLauncher launcher = new MapReduceLauncher();
+ try {
PigStats stats = launcher.launchPig(plan, jobName, pigContext);
- if(stats != null)
- return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats);
- else
- return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
+ for (FileSpec spec: launcher.getSucceededFiles()) {
+ jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats));
+ }
+
+ for (FileSpec spec: launcher.getFailedFiles()) {
+ HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, spec, stats);
+ j.setException(launcher.getError(spec));
+ jobs.add(j);
+ }
+
+ return jobs;
} catch (Exception e) {
// There are a lot of exceptions thrown by the launcher. If this
// is an ExecException, just let it through. Else wrap it.
@@ -274,11 +284,13 @@
String msg = "Unexpected error during execution.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
+ } finally {
+ launcher.reset();
}
}
- public ExecJob submit(PhysicalPlan plan,
+ public List<ExecJob> submit(PhysicalPlan plan,
String jobName) throws ExecException {
throw new UnsupportedOperationException();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri May 15 17:14:35 2009
@@ -45,6 +45,7 @@
protected JOB_STATUS status;
protected PigContext pigContext;
protected FileSpec outFileSpec;
+ protected Exception backendException;
private PigStats stats;
public HJob(JOB_STATUS status,
@@ -161,4 +162,12 @@
public void getSTDError(OutputStream error) throws ExecException {
throw new UnsupportedOperationException();
}
+
+ public void setException(Exception e) {
+ backendException = e;
+ }
+
+ public Exception getException() {
+ return backendException;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri May 15 17:14:35 2009
@@ -108,13 +108,32 @@
public static final String LOG_DIR = "_logs";
- private List<Path> tmpPath;
- private Path curTmpPath;
+ // A mapping of job to pair of store locations and tmp locations for that job
+ private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
this.conf = conf;
- tmpPath = new LinkedList<Path>();
+ jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
+ }
+
+ /**
+ * Returns all store locations of a previously compiled job
+ */
+ public List<POStore> getStores(Job job) {
+ Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
+ if (pair != null && pair.first != null) {
+ return pair.first;
+ } else {
+ return new ArrayList<POStore>();
+ }
+ }
+
+ /**
+ * Resets the state
+ */
+ public void reset() {
+ jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
}
/**
@@ -126,26 +145,24 @@
* This method should always be called after the job execution
* completes.
*/
- public void moveResults() throws IOException {
- if (curTmpPath != null) {
- tmpPath.add(curTmpPath);
- curTmpPath = null;
- }
-
- for (Path tmp: tmpPath) {
- Path abs = new Path(tmp, "abs");
- Path rel = new Path(tmp, "rel");
- FileSystem fs = tmp.getFileSystem(conf);
+ public void moveResults(List<Job> completedJobs) throws IOException {
+ for (Job job: completedJobs) {
+ Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
+ if (pair != null && pair.second != null) {
+ Path tmp = pair.second;
+ Path abs = new Path(tmp, "abs");
+ Path rel = new Path(tmp, "rel");
+ FileSystem fs = tmp.getFileSystem(conf);
- if (fs.exists(abs)) {
- moveResults(abs, abs.toUri().getPath(), fs);
- }
-
- if (fs.exists(rel)) {
- moveResults(rel, rel.toUri().getPath()+"/", fs);
+ if (fs.exists(abs)) {
+ moveResults(abs, abs.toUri().getPath(), fs);
+ }
+
+ if (fs.exists(rel)) {
+ moveResults(rel, rel.toUri().getPath()+"/", fs);
+ }
}
}
- tmpPath = new LinkedList<Path>();
}
/**
@@ -171,19 +188,16 @@
return new Path(pathStr);
}
- private void makeTmpPath() throws IOException {
- if (curTmpPath != null) {
- tmpPath.add(curTmpPath);
- }
-
+ private Path makeTmpPath() throws IOException {
+ Path tmpPath = null;
for (int tries = 0;;) {
try {
- curTmpPath =
+ tmpPath =
new Path(FileLocalizer
.getTemporaryPath(null, pigContext).toString());
- FileSystem fs = curTmpPath.getFileSystem(conf);
- curTmpPath = curTmpPath.makeQualified(fs);
- fs.mkdirs(curTmpPath);
+ FileSystem fs = tmpPath.getFileSystem(conf);
+ tmpPath = tmpPath.makeQualified(fs);
+ fs.mkdirs(tmpPath);
break;
} catch (IOException ioe) {
if (++tries==100) {
@@ -191,6 +205,7 @@
}
}
}
+ return tmpPath;
}
/**
@@ -220,7 +235,7 @@
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
roots.addAll(plan.getRoots());
for (MapReduceOper mro: roots) {
- jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+ jobCtrl.addJob(getJob(mro, conf, pigContext));
plan.remove(mro);
}
} catch (JobCreationException jce) {
@@ -235,7 +250,7 @@
}
/**
- * The method that creates the JobConf corresponding to a MapReduceOper.
+ * The method that creates the Job corresponding to a MapReduceOper.
* The assumption is that
* every MapReduceOper will have a load and a store. The JobConf removes
* the load operator and serializes the input filespec so that PigInputFormat can
@@ -253,13 +268,15 @@
* @param mro - The MapReduceOper for which the JobConf is required
* @param conf - the Configuration object from which JobConf is built
* @param pigContext - The PigContext passed on from execution engine
- * @return JobConf corresponding to mro
+ * @return Job corresponding to mro
* @throws JobCreationException
*/
- private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
+ private Job getJob(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
JobConf jobConf = new JobConf(conf);
ArrayList<Pair<FileSpec, Boolean>> inp = new ArrayList<Pair<FileSpec, Boolean>>();
ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
+ ArrayList<POStore> storeLocations = new ArrayList<POStore>();
+ Path tmpLocation = null;
//Set the User Name for this job. This will be
//used as the working directory
@@ -320,6 +337,14 @@
List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+ for (POStore st: mapStores) {
+ storeLocations.add(st);
+ }
+
+ for (POStore st: reduceStores) {
+ storeLocations.add(st);
+ }
+
if (mapStores.size() + reduceStores.size() == 1) { // single store case
log.info("Setting up single store job");
@@ -370,21 +395,22 @@
else { // multi store case
log.info("Setting up multi store job");
- makeTmpPath();
- FileSystem fs = curTmpPath.getFileSystem(conf);
+ tmpLocation = makeTmpPath();
+
+ FileSystem fs = tmpLocation.getFileSystem(conf);
for (POStore st: mapStores) {
Path tmpOut = new Path(
- curTmpPath,
+ tmpLocation,
PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
fs.mkdirs(tmpOut);
}
jobConf.setOutputFormat(PigOutputFormat.class);
- FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+ FileOutputFormat.setOutputPath(jobConf, tmpLocation);
jobConf.set("pig.streaming.log.dir",
- new Path(curTmpPath, LOG_DIR).toString());
- jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+ new Path(tmpLocation, LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", tmpLocation.toString());
}
// store map key type
@@ -475,7 +501,10 @@
ObjectSerializer.serialize(mro.getSortOrder()));
}
}
- return jobConf;
+
+ Job job = new Job(jobConf);
+ jobStoreMap.put(job,new Pair(storeLocations, tmpLocation));
+ return job;
} catch (JobCreationException jce) {
throw jce;
} catch(Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri May 15 17:14:35 2009
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.LinkedList;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -48,6 +49,7 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
public abstract class Launcher {
@@ -58,6 +60,9 @@
boolean pigException = false;
boolean outOfMemory = false;
final String OOM_ERR = "OutOfMemoryError";
+
+ protected List<FileSpec> succeededStores = null;
+ protected List<FileSpec> failedStores = null;
protected Launcher(){
totalHadoopTimeSpent = 0;
@@ -65,7 +70,36 @@
if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
newLine = "\r\n";
}
+ reset();
+ }
+
+ /**
+ * Returns a list of locations of results that have been
+ * successfully completed.
+ * @return A list of filspecs that corresponds to the locations of
+ * the successful stores.
+ */
+ public List<FileSpec> getSucceededFiles() {
+ return succeededStores;
}
+
+ /**
+ * Returns a list of locations of results that have failed.
+ * @return A list of filspecs that corresponds to the locations of
+ * the failed stores.
+ */
+ public List<FileSpec> getFailedFiles() {
+ return failedStores;
+ }
+
+ /**
+ * Resets the state after a launch
+ */
+ public void reset() {
+ succeededStores = new LinkedList<FileSpec>();
+ failedStores = new LinkedList<FileSpec>();
+ }
+
/**
* Method to launch pig for hadoop either for a cluster's
* job tracker or for a local job runner. THe only difference
@@ -127,14 +161,14 @@
JobID MRJobID = job.getAssignedJobID();
String jobMessage = job.getMessage();
if(MRJobID == null) {
- try {
+ try {
throw getExceptionFromString(jobMessage);
} catch (Exception e) {
//just get the first line in the message and log the rest
String firstLine = getFirstLineFromMessage(jobMessage);
-
+
LogUtils.writeLog(new Exception(jobMessage), pigContext.getProperties().getProperty("pig.logfile"),
- log, false, null, false, false);
+ log, false, null, false, false);
int errCode = 2997;
String msg = "Unable to recreate exception from backend error: " + firstLine;
throw new ExecException(msg, errCode, PigException.BUG, e);
@@ -179,32 +213,32 @@
//this comparison is in place till Hadoop 0.20 provides methods to query
//job status
if(reports[i].getProgress() != successfulProgress) {
- jobFailed = true;
+ jobFailed = true;
}
Set<String> errorMessageSet = new HashSet<String>();
for (int j = 0; j < msgs.length; j++) {
- if(!errorMessageSet.contains(msgs[j])) {
- errorMessageSet.add(msgs[j]);
- if (errNotDbg) {
- //errNotDbg is used only for failed jobs
- //keep track of all the unique exceptions
+ if(!errorMessageSet.contains(msgs[j])) {
+ errorMessageSet.add(msgs[j]);
+ if (errNotDbg) {
+ //errNotDbg is used only for failed jobs
+ //keep track of all the unique exceptions
try {
Exception e = getExceptionFromString(msgs[j]);
exceptions.add(e);
} catch (Exception e1) {
String firstLine = getFirstLineFromMessage(msgs[j]);
LogUtils.writeLog(new Exception(msgs[j]), pigContext.getProperties().getProperty("pig.logfile"),
- log, false, null, false, false);
+ log, false, null, false, false);
int errCode = 2997;
String msg = "Unable to recreate exception from backed error: " + firstLine;
throw new ExecException(msg, errCode, PigException.BUG, e1);
}
- } else {
- log.debug("Error message from task (" + type + ") " +
- reports[i].getTaskID() + msgs[j]);
- }
- }
- }
+ } else {
+ log.debug("Error message from task (" + type + ") " +
+ reports[i].getTaskID() + msgs[j]);
+ }
+ }
+ }
}
//if its a failed job then check if there is more than one exception
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri May 15 17:14:35 2009
@@ -229,7 +229,6 @@
List<POStore> stores = PlanHelper.getStores(plan);
for (POStore store: stores) {
- FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
compile(store);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri May 15 17:14:35 2009
@@ -50,12 +50,15 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
/**
@@ -68,16 +71,31 @@
//used to track the exception thrown by the job control which is run in a separate thread
private Exception jobControlException = null;
private boolean aggregateWarning = false;
-
+ private Map<FileSpec, Exception> failureMap;
+
+ /**
+ * Get the exception that caused a failure on the backend for a
+ * store location (if any).
+ */
+ public Exception getError(FileSpec spec) {
+ return failureMap.get(spec);
+ }
+
+ @Override
+ public void reset() {
+ failureMap = new HashMap<FileSpec, Exception>();
+ super.reset();
+ }
+
@Override
public PigStats launchPig(PhysicalPlan php,
- String grpName,
- PigContext pc) throws PlanException,
- VisitorException,
- IOException,
- ExecException,
- JobCreationException,
- Exception {
+ String grpName,
+ PigContext pc) throws PlanException,
+ VisitorException,
+ IOException,
+ ExecException,
+ JobCreationException,
+ Exception {
long sleepTime = 500;
aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
@@ -140,8 +158,29 @@
numMRJobsCompl += numMRJobsCurrent;
failedJobs.addAll(jc.getFailedJobs());
- succJobs.addAll(jc.getSuccessfulJobs());
- jcc.moveResults();
+
+ if (!failedJobs.isEmpty()
+ && "true".equalsIgnoreCase(
+ pc.getProperties().getProperty("stop.on.failure","false"))) {
+ int errCode = 6017;
+ StringBuilder msg = new StringBuilder("Execution failed, while processing ");
+
+ for (Job j: failedJobs) {
+ List<POStore> sts = jcc.getStores(j);
+ for (POStore st: sts) {
+ msg.append(st.getSFile().getFileName());
+ msg.append(", ");
+ }
+ }
+
+ throw new ExecException(msg.substring(0,msg.length()-2),
+ errCode, PigException.REMOTE_ENVIRONMENT);
+ }
+
+ List<Job> jobs = jc.getSuccessfulJobs();
+ jcc.moveResults(jobs);
+ succJobs.addAll(jobs);
+
stats.setJobClient(jobClient);
stats.setJobControl(jc);
@@ -150,40 +189,76 @@
jc.stop();
}
+ log.info( "100% complete");
+
+ boolean failed = false;
// Look to see if any jobs failed. If so, we need to report that.
if (failedJobs != null && failedJobs.size() > 0) {
- log.error("Map reduce job failed");
+ log.error(failedJobs.size()+" map reduce job(s) failed!");
+ Exception backendException = null;
+
for (Job fj : failedJobs) {
- getStats(fj, jobClient, true, pc);
+
+ try {
+ getStats(fj, jobClient, true, pc);
+ } catch (Exception e) {
+ backendException = e;
+ }
+
+ List<POStore> sts = jcc.getStores(fj);
+ for (POStore st: sts) {
+ if (!st.isTmpStore()) {
+ failedStores.add(st.getSFile());
+ failureMap.put(st.getSFile(), backendException);
+ }
+
+ FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
+ log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
+ }
}
- return null;
+ failed = true;
}
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
if(succJobs!=null) {
for(Job job : succJobs){
+ List<POStore> sts = jcc.getStores(job);
+ for (POStore st: sts) {
+ if (!st.isTmpStore()) {
+ succeededStores.add(st.getSFile());
+ }
+ log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
+ }
getStats(job,jobClient, false, pc);
if(aggregateWarning) {
- computeWarningAggregate(job, jobClient, warningAggMap);
+ computeWarningAggregate(job, jobClient, warningAggMap);
}
}
}
if(aggregateWarning) {
- CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
+ CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
}
if(stats.getPigStats().get(stats.getLastJobID()) == null)
- log
- .warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
+ log.warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
else {
log.info("Records written : " + stats.getRecordsWritten());
log.info("Bytes written : " + stats.getBytesWritten());
}
- log.info( "100% complete");
- log.info("Success!");
+ if (!failed) {
+ log.info("Success!");
+ } else {
+ if (succJobs != null && succJobs.size() > 0) {
+ log.info("Some jobs have failed!");
+ } else {
+ log.info("Failed!");
+ }
+ }
+ jcc.reset();
+
return stats;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Fri May 15 17:14:35 2009
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Properties;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
@@ -143,31 +144,37 @@
}
}
- public ExecJob execute(PhysicalPlan plan, String jobName)
- throws ExecException {
+ public List<ExecJob> execute(PhysicalPlan plan, String jobName)
+ throws ExecException {
try {
PhysicalOperator leaf = (PhysicalOperator) plan.getLeaves().get(0);
- FileSpec spec = null;
if (!(leaf instanceof POStore)) {
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
- pigContext).toString(), new FuncSpec(BinStorage.class
- .getName()));
+ FileSpec spec = new FileSpec(FileLocalizer.getTemporaryPath(null,pigContext).toString(),
+ new FuncSpec(BinStorage.class.getName()));
str.setSFile(spec);
plan.addAsLeaf(str);
- } else {
- spec = ((POStore) leaf).getSFile();
}
LocalPigLauncher launcher = new LocalPigLauncher();
+
+ List<ExecJob> jobs = new ArrayList<ExecJob>();
+
PigStats stats = launcher.launchPig(plan, jobName, pigContext);
- if (stats != null)
- return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
- spec, stats);
- else
- return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
+ for (FileSpec fspec: launcher.getSucceededFiles()) {
+ jobs.add(new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, fspec, stats));
+ }
+
+ for (FileSpec fspec: launcher.getFailedFiles()) {
+ jobs.add(new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, fspec, stats));
+ }
+
+ launcher.reset();
+
+ return jobs;
+
} catch (Exception e) {
// There are a lot of exceptions thrown by the launcher. If this
// is an ExecException, just let it through. Else wrap it.
@@ -178,7 +185,7 @@
}
}
- public LocalJob submit(PhysicalPlan plan, String jobName)
+ public List<ExecJob> submit(PhysicalPlan plan, String jobName)
throws ExecException {
throw new UnsupportedOperationException();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java Fri May 15 17:14:35 2009
@@ -158,4 +158,8 @@
public void getSTDError(OutputStream error) throws ExecException {
throw new UnsupportedOperationException();
}
+
+ public Exception getException() {
+ return null;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Fri May 15 17:14:35 2009
@@ -39,9 +39,11 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.PigException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -95,7 +97,7 @@
log.info("running store with dependencies");
POStore[] st = new POStore[1];
st[0] = op;
- failedJobs += runPipeline(st);
+ failedJobs += runPipeline(st, pc);
for (PhysicalOperator suc: sucs) {
php.disconnect(op, suc);
}
@@ -104,13 +106,21 @@
}
// The remaining stores can be run together.
- failedJobs += runPipeline(stores.toArray(new POStore[0]));
+ failedJobs += runPipeline(stores.toArray(new POStore[0]), pc);
stats.accumulateStats();
UDFFinishVisitor finisher = new UDFFinishVisitor(php, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(php));
finisher.visit();
+ for (FileSpec spec: failedStores) {
+ log.info("Failed to produce result in: \""+spec.getFileName()+"\"");
+ }
+
+ for (FileSpec spec: succeededStores) {
+ log.info("Successfully stored result in: \""+spec.getFileName()+"\"");
+ }
+
if (failedJobs == 0) {
log.info("Records written : " + stats.getRecordsWritten());
log.info("Bytes written : " + stats.getBytesWritten());
@@ -124,9 +134,8 @@
return null;
}
-
- private int runPipeline(POStore[] leaves) throws IOException, ExecException {
+ private int runPipeline(POStore[] leaves, PigContext pc) throws IOException, ExecException {
BitSet bs = new BitSet(leaves.length);
int failed = 0;
while(true) {
@@ -146,9 +155,20 @@
leaves[i].cleanUp();
leaves[i].tearDown();
failed++;
- // fallthrough
+ failedStores.add(leaves[i].getSFile());
+ if ("true".equalsIgnoreCase(
+ pc.getProperties().getProperty("stop.on.failure","false"))) {
+ int errCode = 6017;
+ String msg = "Execution failed, while processing "
+ + leaves[i].getSFile().getFileName();
+
+ throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT);
+ }
+ bs.set(i);
+ break;
case POStatus.STATUS_EOP:
leaves[i].tearDown();
+ succeededStores.add(leaves[i].getSFile());
// fallthrough
default:
bs.set(i);
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java Fri May 15 17:14:35 2009
@@ -81,11 +81,11 @@
}
}
- public void exec() throws Throwable {
+ public int[] exec() throws Throwable {
boolean verbose = "true".equalsIgnoreCase(pig.getPigContext().getProperties().getProperty("verbose"));
try {
parser.setInteractive(false);
- parser.parseStopOnError();
+ return parser.parseStopOnError();
} catch (Throwable t) {
LogUtils.writeLog(t, pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
throw (t);
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri May 15 17:14:35 2009
@@ -58,6 +58,8 @@
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.tools.pigscript.parser.ParseException;
@@ -107,7 +109,22 @@
}
if (!mLoadOnly) {
- mPigServer.executeBatch();
+ List<ExecJob> jobs = mPigServer.executeBatch();
+ for(ExecJob job: jobs) {
+ if (job.getStatus() == ExecJob.JOB_STATUS.FAILED) {
+ mNumFailedJobs++;
+ if (job.getException() != null) {
+ LogUtils.writeLog(
+ job.getException(),
+ mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),
+ log,
+ "true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")));
+ }
+ }
+ else {
+ mNumSucceededJobs++;
+ }
+ }
}
}
}
@@ -118,9 +135,9 @@
}
}
- public void parseStopOnError() throws IOException, ParseException
+ public int[] parseStopOnError() throws IOException, ParseException
{
- parseStopOnError(false);
+ return parseStopOnError(false);
}
/**
@@ -130,7 +147,7 @@
*
* @throws IOException, ParseException
*/
- public void parseStopOnError(boolean sameBatch) throws IOException, ParseException
+ public int[] parseStopOnError(boolean sameBatch) throws IOException, ParseException
{
if (mPigServer == null) {
throw new IllegalStateException();
@@ -156,6 +173,8 @@
discardBatch();
}
}
+ int [] res = { mNumSucceededJobs, mNumFailedJobs };
+ return res;
}
public void setLoadOnly(boolean loadOnly)
@@ -733,4 +752,6 @@
private boolean mDone;
private boolean mLoadOnly;
private ExplainState mExplain;
+ private int mNumFailedJobs;
+ private int mNumSucceededJobs;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Fri May 15 17:14:35 2009
@@ -589,4 +589,100 @@
grunt.exec();
}
+
+ @Test
+ public void testKeepGoing() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd =
+ "rmf bar;"
+ +"rmf foo;"
+ +"rmf baz;"
+ +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"B = foreach A generate 1;"
+ +"C = foreach A generate 0/0;"
+ +"store B into 'foo';"
+ +"store C into 'bar';"
+ +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"B = stream A through `false`;"
+ +"store B into 'baz';"
+ +"cat bar;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testKeepGoigFailed() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd =
+ "rmf bar;"
+ +"rmf foo;"
+ +"rmf baz;"
+ +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"B = foreach A generate 1;"
+ +"C = foreach A generate 0/0;"
+ +"store B into 'foo';"
+ +"store C into 'bar';"
+ +"A = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"B = stream A through `false`;"
+ +"store B into 'baz';"
+ +"cat baz;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ boolean caught = false;
+ try {
+ grunt.exec();
+ } catch (Exception e) {
+ caught = true;
+ assertTrue(e.getMessage().contains("baz does not exist"));
+ }
+ assertTrue(caught);
+ }
+
+ @Test
+ public void testStopOnFailure() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+ context.getProperties().setProperty("stop.on.failure", ""+true);
+
+ String strCmd =
+ "rmf bar;\n"
+ +"rmf foo;\n"
+ +"rmf baz;\n"
+ +"copyFromLocal test/org/apache/pig/test/data/passwd pre;\n"
+ +"A = load 'file:test/org/apache/pig/test/data/passwd';\n"
+ +"B = stream A through `false`;\n"
+ +"store B into 'bar' using BinStorage();\n"
+ +"A = load 'bar';\n"
+ +"store A into 'foo';\n"
+ +"cp pre done;\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ boolean caught = false;
+ try {
+ grunt.exec();
+ } catch (PigException e) {
+ caught = true;
+ assertTrue(e.getErrorCode() == 6017);
+ }
+
+ assertFalse(server.existsFile("done"));
+ assertTrue(caught);
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri May 15 17:14:35 2009
@@ -22,6 +22,7 @@
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -30,6 +31,7 @@
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.util.ExecTools;
+import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -116,14 +118,15 @@
myPig.registerQuery("c = group b by gid;");
myPig.registerQuery("store c into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
}
-
+
@Test
public void testMultiQueryWithTwoLoads2() {
@@ -1665,5 +1668,4 @@
Assert.fail();
}
}
-
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=775239&r1=775238&r2=775239&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Fri May 15 17:14:35 2009
@@ -20,6 +20,7 @@
import java.io.StringReader;
import java.io.IOException;
import java.io.File;
+import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
@@ -550,11 +551,15 @@
}
private boolean executePlan(PhysicalPlan pp) throws IOException {
+ boolean failed = true;
FileLocalizer.clearDeleteOnFail();
- ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp, "execute");
- boolean failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED);
- if (failed) {
- FileLocalizer.triggerDeleteOnFail();
+ List<ExecJob> jobs = myPig.getPigContext().getExecutionEngine().execute(pp, "execute");
+ for (ExecJob job: jobs) {
+ failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED);
+ if (failed) {
+ FileLocalizer.triggerDeleteOnFail();
+ break;
+ }
}
return !failed;
}