You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/08/30 19:40:12 UTC
svn commit: r990877 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/tools/grunt/ src/org/apache/pig/tools/pigstats/
test/org/apache/pig/test/
Author: rding
Date: Mon Aug 30 17:40:11 2010
New Revision: 990877
URL: http://svn.apache.org/viewvc?rev=990877&view=rev
Log:
PIG-1343: pig_log file missing even though Main tells it is creating one and an M/R job fails
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 17:40:11 2010
@@ -171,6 +171,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1343: pig_log file missing even though Main tells it is creating one and
+an M/R job fails (nrai via rding)
+
PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via thejas)
PIG-1579: Intermittent unit test failure for TestScriptUDF.testPythonScriptUDFNullInputOutput (daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Aug 30 17:40:11 2010
@@ -103,10 +103,12 @@ import org.apache.pig.pen.ExampleGenerat
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
/**
@@ -323,13 +325,20 @@ public class PigServer {
public List<ExecJob> executeBatch() throws FrontendException, ExecException {
PigStats stats = executeBatchEx();
LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
- for (OutputStats output : stats.getOutputStats()) {
- if (output.isSuccessful()) {
- jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
- .getPOStore(), output.getAlias(), stats));
- } else {
- jobs.add(new HJob(HJob.JOB_STATUS.FAILED, pigContext, output
- .getPOStore(), output.getAlias(), stats));
+ JobGraph jGraph = stats.getJobGraph();
+ Iterator<JobStats> iter = jGraph.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ for (OutputStats output : js.getOutputs()) {
+ if (js.isSuccessful()) {
+ jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
+ .getPOStore(), output.getAlias(), stats));
+ } else {
+ HJob hjob = new HJob(HJob.JOB_STATUS.FAILED, pigContext, output
+ .getPOStore(), output.getAlias(), stats);
+ hjob.setException(js.getException());
+ jobs.add(hjob);
+ }
}
}
return jobs;
@@ -338,7 +347,7 @@ public class PigServer {
private PigStats executeBatchEx() throws FrontendException, ExecException {
if (!isMultiQuery) {
// ignore if multiquery is off
- return PigStatsUtil.getEmptyPigStats();
+ return PigStats.get();
}
if (currDAG == null || !isBatchOn()) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Mon Aug 30 17:40:11 2010
@@ -178,47 +178,37 @@ public abstract class Launcher {
for (int i = 0; i < reports.length; i++) {
String msgs[] = reports[i].getDiagnostics();
ArrayList<Exception> exceptions = new ArrayList<Exception>();
- boolean jobFailed = false;
- float successfulProgress = 1.0f;
- if (msgs.length > 0) {
- //if the progress reported is not 1.0f then the map or reduce job failed
- //this comparison is in place till Hadoop 0.20 provides methods to query
- //job status
- if(reports[i].getProgress() != successfulProgress) {
- jobFailed = true;
- }
- Set<String> errorMessageSet = new HashSet<String>();
- for (int j = 0; j < msgs.length; j++) {
- if(!errorMessageSet.contains(msgs[j])) {
- errorMessageSet.add(msgs[j]);
- if (errNotDbg) {
- //errNotDbg is used only for failed jobs
- //keep track of all the unique exceptions
- try {
- LogUtils.writeLog("Backend error message", msgs[j],
- pigContext.getProperties().getProperty("pig.logfile"),
- log);
- Exception e = getExceptionFromString(msgs[j]);
- exceptions.add(e);
- } catch (Exception e1) {
- String firstLine = getFirstLineFromMessage(msgs[j]);
- int errCode = 2997;
- String msg = "Unable to recreate exception from backed error: " + firstLine;
- throw new ExecException(msg, errCode, PigException.BUG);
- }
- } else {
- log.debug("Error message from task (" + type + ") " +
- reports[i].getTaskID() + msgs[j]);
+ Set<String> errorMessageSet = new HashSet<String>();
+ for (int j = 0; j < msgs.length; j++) {
+ if(!errorMessageSet.contains(msgs[j])) {
+ errorMessageSet.add(msgs[j]);
+ if (errNotDbg) {
+ //errNotDbg is used only for failed jobs
+ //keep track of all the unique exceptions
+ try {
+ LogUtils.writeLog("Backend error message", msgs[j],
+ pigContext.getProperties().getProperty("pig.logfile"),
+ log);
+ Exception e = getExceptionFromString(msgs[j]);
+ exceptions.add(e);
+ } catch (Exception e1) {
+ String firstLine = getFirstLineFromMessage(msgs[j]);
+ int errCode = 2997;
+ String msg = "Unable to recreate exception from backed error: " + firstLine;
+ throw new ExecException(msg, errCode, PigException.BUG);
}
+ } else {
+ log.debug("Error message from task (" + type + ") " +
+ reports[i].getTaskID() + msgs[j]);
}
}
- }
+ }
//if its a failed job then check if there is more than one exception
//more than one exception implies possibly different kinds of failures
//log all the different failures and throw the exception corresponding
//to the first failure
- if(jobFailed) {
+ if (errNotDbg) {
if(exceptions.size() > 1) {
for(int j = 0; j < exceptions.size(); ++j) {
String headerMessage = "Error message from task (" + type + ") " + reports[i].getTaskID();
@@ -233,7 +223,7 @@ public abstract class Launcher {
throw new ExecException(msg, errCode, PigException.BUG);
}
}
- }
+ }
}
/**
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Aug 30 17:40:11 2010
@@ -56,6 +56,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
@@ -112,16 +113,16 @@ public class GruntParser extends PigScri
for(ExecJob job: jobs) {
if (job.getStatus() == ExecJob.JOB_STATUS.FAILED) {
mNumFailedJobs++;
- if (job.getException() != null) {
- LogUtils.writeLog(
- job.getException(),
- mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),
- log,
- "true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")),
- "Pig Stack Trace");
- }
- }
- else {
+ Exception exp = (job.getException() != null) ? job.getException()
+ : new ExecException(
+ "Job failed, hadoop does not return any error message",
+ 2244);
+ LogUtils.writeLog(exp,
+ mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),
+ log,
+ "true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")),
+ "Pig Stack Trace");
+ } else {
mNumSucceededJobs++;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Aug 30 17:40:11 2010
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -267,6 +268,14 @@ public final class PigStats {
}
/**
+ * Returns the properties associated with the script
+ */
+ public Properties getPigProperties() {
+ if (pigContext == null) return null;
+ return pigContext.getProperties();
+ }
+
+ /**
* Returns the DAG of the MR jobs spawned by the script
*/
public JobGraph getJobGraph() {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon Aug 30 17:40:11 2010
@@ -43,6 +43,7 @@ import org.apache.pig.tools.pigstats.Pig
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -73,6 +74,57 @@ public class TestPigRunner {
cluster.shutDown();
}
+ @Before
+ public void setUp() {
+ deleteAll(new File(OUTPUT_FILE));
+ }
+
+ @Test
+ public void testErrorLogFile() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = foreach A generate StringSize(a0);");
+ w.println("store B into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { "-x", "local", PIG_FILE };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(!stats.isSuccessful());
+
+ Properties props = stats.getPigProperties();
+ String logfile = props.getProperty("pig.logfile");
+ File f = new File(logfile);
+ assertTrue(f.exists());
+ } finally {
+ new File(PIG_FILE).delete();
+ }
+ }
+
+ @Test
+ public void testErrorLogFile2() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = foreach A generate StringSize(a0);");
+ w.println("store B into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { "-M", "-x", "local", PIG_FILE };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(!stats.isSuccessful());
+
+ Properties props = stats.getPigProperties();
+ String logfile = props.getProperty("pig.logfile");
+ File f = new File(logfile);
+ assertTrue(f.exists());
+ } finally {
+ new File(PIG_FILE).delete();
+ }
+ }
+
@Test
public void simpleTest() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -461,4 +513,13 @@ public class TestPigRunner {
}
+ private void deleteAll(File d) {
+ if (!d.exists()) return;
+ if (d.isDirectory()) {
+ for (File f : d.listFiles()) {
+ deleteAll(f);
+ }
+ }
+ d.delete();
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Mon Aug 30 17:40:11 2010
@@ -40,6 +40,7 @@ import org.apache.pig.impl.util.LogUtils
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.junit.Before;
import org.junit.Test;
public class TestPigStats {
@@ -73,29 +74,38 @@ public class TestPigStats {
@Test
public void testPigStatsAlias() throws Exception {
- PigServer pig = new PigServer(ExecType.LOCAL);
- pig.registerQuery("A = load 'input' as (name, age, gpa);");
- pig.registerQuery("B = group A by name;");
- pig.registerQuery("C = foreach B generate group, COUNT(A);");
- pig.registerQuery("D = order C by $1;");
- pig.registerQuery("E = limit D 10;");
- pig.registerQuery("store E into 'output';");
-
- LogicalPlan lp = getLogicalPlan(pig);
- PhysicalPlan pp = pig.getPigContext().getExecutionEngine().compile(lp,
- null);
- MROperPlan mp = getMRPlan(pp, pig.getPigContext());
-
- assertEquals(3, mp.getKeys().size());
-
- MapReduceOper mro = mp.getRoots().get(0);
- assertEquals("A,B,C", getAlias(mro));
-
- mro = mp.getSuccessors(mro).get(0);
- assertEquals("D", getAlias(mro));
-
- mro = mp.getSuccessors(mro).get(0);
- assertEquals("D", getAlias(mro));
+ try {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ pig.registerQuery("A = load 'input' as (name, age, gpa);");
+ pig.registerQuery("B = group A by name;");
+ pig.registerQuery("C = foreach B generate group, COUNT(A);");
+ pig.registerQuery("D = order C by $1;");
+ pig.registerQuery("E = limit D 10;");
+ pig.registerQuery("store E into 'alias_output';");
+
+ LogicalPlan lp = getLogicalPlan(pig);
+ PhysicalPlan pp = pig.getPigContext().getExecutionEngine().compile(lp,
+ null);
+ MROperPlan mp = getMRPlan(pp, pig.getPigContext());
+
+ assertEquals(3, mp.getKeys().size());
+
+ MapReduceOper mro = mp.getRoots().get(0);
+ assertEquals("A,B,C", getAlias(mro));
+
+ mro = mp.getSuccessors(mro).get(0);
+ assertEquals("D", getAlias(mro));
+
+ mro = mp.getSuccessors(mro).get(0);
+ assertEquals("D", getAlias(mro));
+ } finally {
+ File outputfile = new File("alias_output");
+ if (outputfile.exists()) {
+ // Hadoop Local mode creates a directory
+ // Hence we need to delete a directory recursively
+ deleteDirectory(outputfile);
+ }
+ }
}
private void deleteDirectory( File dir ) {