You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/02/13 02:59:30 UTC
svn commit: r743952 [1/4] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/executionengine/ sr...
Author: olga
Date: Fri Feb 13 01:59:27 2009
New Revision: 743952
URL: http://svn.apache.org/viewvc?rev=743952&view=rev
Log:
PIG-590: error handling on the backend (sms via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
hadoop/pig/trunk/src/org/apache/pig/PigException.java
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/builtin/ARITY.java
hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BagSize.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/CONCAT.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
hadoop/pig/trunk/src/org/apache/pig/builtin/ConstantSize.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DIFF.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MapSize.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/SIZE.java
hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringConcat.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringSize.java
hadoop/pig/trunk/src/org/apache/pig/builtin/TOKENIZE.java
hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
hadoop/pig/trunk/src/org/apache/pig/builtin/TupleSize.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPOCogroup.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 13 01:59:27 2009
@@ -419,3 +419,6 @@
PIG-665: Map key type not correctly set (for use when key is null) when
map plan does not have localrearrange (pradeepkth)
+
+ PIG-590: error handling on the backend (sms via olgan)
+
Modified: hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java Fri Feb 13 01:59:27 2009
@@ -38,6 +38,16 @@
/**
* @param className the name of the class for the udf
+ * @param ctorArg the argument for the constructor for the above class
+ */
+ public FuncSpec(String className, String ctorArg) {
+ this.className = className;
+ this.ctorArgs = new String[1];
+ this.ctorArgs[0] = ctorArg;
+ }
+
+ /**
+ * @param className the name of the class for the udf
* @param ctorArgs the arguments for the constructor for the above class
*/
public FuncSpec(String className, String[] ctorArgs) {
Modified: hadoop/pig/trunk/src/org/apache/pig/PigException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigException.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigException.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigException.java Fri Feb 13 01:59:27 2009
@@ -39,6 +39,7 @@
public static final byte BUG = 4;
public static final byte USER_ENVIRONMENT = 8;
public static final byte REMOTE_ENVIRONMENT = 16;
+ public static final byte ERROR = -1;
/**
* A static method to query if an error source is due to
@@ -84,6 +85,25 @@
return ((errSource & REMOTE_ENVIRONMENT) == 0 ? false : true);
}
+ /**
+ * A static method to determine the error source given the error code
+ *
+ * @param errCode - integer error code
+ * @return byte that indicates the error source
+ */
+ public static byte determineErrorSource(int errCode) {
+ if(errCode >= 100 && errCode <= 1999) {
+ return PigException.INPUT;
+ } else if (errCode >= 2000 && errCode <= 2999) {
+ return PigException.BUG;
+ } else if (errCode >= 3000 && errCode <= 4999) {
+ return PigException.USER_ENVIRONMENT;
+ } else if (errCode >= 5000 && errCode <= 6999) {
+ return PigException.REMOTE_ENVIRONMENT;
+ }
+ return PigException.ERROR;
+ }
+
protected int errorCode = 0;
protected byte errorSource = BUG;
protected boolean retriable = false;
@@ -292,4 +312,27 @@
detailedMessage = detailMsg;
}
+ /**
+ * Returns a short description of this throwable.
+ * The result is the concatenation of:
+ * <ul>
+ * <li> the {@linkplain Class#getName() name} of the class of this object
+ * <li> ": " (a colon and a space)
+ * <li> "ERROR " (the string ERROR followed by a a space)
+ * <li> the result of invoking this object's {@link #getErrorCode} method
+ * <li> ": " (a colon and a space)
+ * <li> the result of invoking {@link Throwable#getLocalizedMessage() getLocalizedMessage}
+ * method
+ * </ul>
+ * If <tt>getLocalizedMessage</tt> returns <tt>null</tt>, then just
+ * the class name is returned.
+ *
+ * @return a string representation of this throwable.
+ */
+ @Override
+ public String toString() {
+ String s = getClass().getName();
+ String message = getLocalizedMessage();
+ return (message != null) ? (s + ": " + "ERROR " + getErrorCode() + ": " + message) : s;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb 13 01:59:27 2009
@@ -416,6 +416,7 @@
// ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
// invocation of "execute" is synchronous!
+
if (job.getStatus() == JOB_STATUS.COMPLETED) {
return job.getResults();
} else {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java Fri Feb 13 01:59:27 2009
@@ -29,6 +29,7 @@
import java.util.zip.GZIPInputStream;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
import org.apache.pig.Slice;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -75,7 +76,9 @@
try {
loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
} catch (Exception exp) {
- throw new RuntimeException("can't instantiate " + parser);
+ int errCode = 2081;
+ String msg = "Unable to set up the load function.";
+ throw new ExecException(msg, errCode, PigException.BUG, exp);
}
}
fsis = base.asElement(base.getActiveContainer(), file).sopen();
@@ -152,9 +155,9 @@
try {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
- IOException newE = new IOException(cnfe.getMessage());
- newE.initCause(cnfe);
- throw newE;
+ int errCode = 2094;
+ String msg = "Unable to deserialize object.";
+ throw new ExecException(msg, errCode, PigException.BUG, cnfe);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Fri Feb 13 01:59:27 2009
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -79,8 +80,10 @@
}
continue;
}
- } catch (Exception e) {
- throw WrappedIOException.wrap(e);
+ } catch (Exception e) {
+ int errCode = 2099;
+ String msg = "Problem in constructing slices.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
}
Map<String, Object> stats = fullPath.getStatistics();
long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
@@ -112,7 +115,9 @@
public void validate(DataStorage store, String location) throws IOException {
if (!FileLocalizer.fileExists(location, store)) {
- throw new IOException(store.asElement(location) + " does not exist");
+ int errCode = 2100;
+ String msg = store.asElement(location) + " does not exist.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Fri Feb 13 01:59:27 2009
@@ -94,8 +94,11 @@
case DataType.TUPLE:
return new NullableTuple((Tuple)o);
- case DataType.MAP:
- throw new RuntimeException("Map not supported as a key type!");
+ case DataType.MAP: {
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
case DataType.NULL:
switch (keyType) {
@@ -135,8 +138,12 @@
NullableTuple ntuple = new NullableTuple();
ntuple.setNull(true);
return ntuple;
- case DataType.MAP:
- throw new RuntimeException("Map not supported as a key type!");
+ case DataType.MAP: {
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+
}
break;
default:
@@ -182,8 +189,11 @@
case DataType.TUPLE:
wcKey = defTup;
break;
- case DataType.MAP:
- throw new RuntimeException("Map not supported as a key type!");
+ case DataType.MAP: {
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
default:
if (typeToName == null) typeToName = DataType.genTypeToNameMap();
int errCode = 2044;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Fri Feb 13 01:59:27 2009
@@ -31,7 +31,9 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.*;
+import org.apache.pig.backend.executionengine.ExecException;
public abstract class HPath implements ElementDescriptor {
@@ -89,8 +91,10 @@
new Configuration());
if (!result) {
- throw new IOException("Failed to copy from: " + this.toString() +
- " to: " + dstName.toString());
+ int errCode = 2097;
+ String msg = "Failed to copy from: " + this.toString() +
+ " to: " + dstName.toString();
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Fri Feb 13 01:59:27 2009
@@ -22,7 +22,9 @@
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
public class HSeekableInputStream extends SeekableInputStream {
@@ -53,7 +55,9 @@
break;
}
default: {
- throw new IOException("Invalid seek option: " + whence);
+ int errCode = 2098;
+ String msg = "Invalid seek option: " + whence;
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Feb 13 01:59:27 2009
@@ -261,7 +261,7 @@
if (e instanceof ExecException) throw (ExecException)e;
else {
int errCode = 2043;
- String msg = "Error during execution.";
+ String msg = "Unexpected error during execution.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Feb 13 01:59:27 2009
@@ -32,6 +32,7 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -71,7 +72,9 @@
p.bindTo(outFileSpec.getFileName(), new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
}catch (Exception e){
- throw new ExecException("Unable to get results for " + outFileSpec, e);
+ int errCode = 2088;
+ String msg = "Unable to get results for: " + outFileSpec;
+ throw new ExecException(msg, errCode, PigException.BUG, e);
}
return new Iterator<Tuple>() {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Fri Feb 13 01:59:27 2009
@@ -26,6 +26,7 @@
import org.apache.pig.PigException;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -48,6 +49,7 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
/**
* Optimize map reduce plans to use the combiner where possible.
@@ -278,7 +280,7 @@
} catch (Exception e) {
int errCode = 2018;
String msg = "Internal error. Unable to introduce the combiner for optimization.";
- throw new VisitorException(msg, errCode, PigException.BUG, e);
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}
}
@@ -442,7 +444,9 @@
try {
dp.visit();
} catch (VisitorException e) {
- throw new PlanException(e);
+ int errCode = 2073;
+ String msg = "Problem with replacing distinct operator with distinct built-in function.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
}
@@ -461,7 +465,13 @@
// to type Intermediate in combine plan and to type Final in
// the reduce
POUserFunc distinctFunc = (POUserFunc)getDistinctUserFunc(plans[j], leaf);
- distinctFunc.setAlgebraicFunction(funcTypes[j]);
+ try {
+ distinctFunc.setAlgebraicFunction(funcTypes[j]);
+ } catch (ExecException e) {
+ int errCode = 2074;
+ String msg = "Could not configure distinct's algebraic functions in map reduce plan.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
}
}
@@ -474,7 +484,9 @@
try {
new fixMapProjects(mpl).visit();
} catch (VisitorException e) {
- throw new PlanException(e);
+ int errCode = 2089;
+ String msg = "Unable to flag project operator to use single tuple bag.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
}
}
@@ -619,7 +631,13 @@
throw new PlanException(msg, errCode, PigException.BUG);
}
POUserFunc func = (POUserFunc)leaf;
- func.setAlgebraicFunction(type);
+ try {
+ func.setAlgebraicFunction(type);
+ } catch (ExecException e) {
+ int errCode = 2075;
+ String msg = "Could not set algebraic function type.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
}
private void fixUpRearrange(POLocalRearrange rearrange) throws PlanException {
@@ -802,8 +820,9 @@
if(patched) {
// we should not already have been patched since the
// Project-Distinct pair should occur only once
- throw new VisitorException(
- "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.");
+ int errCode = 2076;
+ String msg = "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
}
// we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
// in place of the Project-PODistinct pair
@@ -824,11 +843,13 @@
func.setResultType(DataType.BAG);
mPlan.replace(proj, func);
mPlan.remove(pred);
- // connect the the newly add "func" to
+ // connect the the newly added "func" to
// the predecessor to the earlier PODistinct
mPlan.connect(distinctPredecessor, func);
} catch (PlanException e) {
- throw new VisitorException(e);
+ int errCode = 2077;
+ String msg = "Problem with reconfiguring plan to add distinct built-in function.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
patched = true;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 13 01:59:27 2009
@@ -46,6 +46,7 @@
import org.apache.pig.ComparisonFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -605,7 +606,7 @@
int errCode = 6003;
String msg = "Invalid cache specification. " +
"File doesn't exist: " + src;
- throw new PigException(msg, errCode, PigException.USER_ENVIRONMENT);
+ throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
}
// Ship it to the cluster if necessary and add to the
@@ -636,7 +637,7 @@
}
String msg = "Invalid ship specification. " +
"File doesn't exist: " + dst;
- throw new PigException(msg, errCode, errSrc);
+ throw new ExecException(msg, errCode, errSrc);
}
DistributedCache.addCacheFile(dstURI, conf);
} else {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri Feb 13 01:59:27 2009
@@ -19,7 +19,12 @@
import java.io.IOException;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +35,8 @@
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
@@ -40,14 +47,23 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.grunt.Utils;
public abstract class Launcher {
private static final Log log = LogFactory.getLog(Launcher.class);
- long totalHadoopTimeSpent;
+ long totalHadoopTimeSpent;
+ String newLine = "\n";
+ boolean pigException = false;
+ boolean outOfMemory = false;
+ final String OOM_ERR = "OutOfMemoryError";
protected Launcher(){
totalHadoopTimeSpent = 0;
+ //handle the windows portion of \r
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+ newLine = "\r\n";
+ }
}
/**
* Method to launch pig for hadoop either for a cluster's
@@ -80,7 +96,7 @@
*/
public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
throws PlanException, VisitorException, IOException, ExecException,
- JobCreationException;
+ JobCreationException, Exception;
/**
* Explain how a pig job will be executed on the underlying
@@ -102,20 +118,26 @@
return (int)(Math.ceil(prog)) == (int)1;
}
- protected void getStats(Job job, JobClient jobClient, boolean errNotDbg) {
+ protected void getStats(Job job, JobClient jobClient, boolean errNotDbg, PigContext pigContext) throws Exception {
JobID MRJobID = job.getAssignedJobID();
+
+ if(MRJobID == null) {
+ throw getExceptionFromString(job.getMessage());
+ }
try {
TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
- getErrorMessages(mapRep, "map", errNotDbg);
+ getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
- getErrorMessages(redRep, "reduce", errNotDbg);
+ getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
} catch (IOException e) {
if(job.getState() == Job.SUCCESS) {
// if the job succeeded, let the user know that
// we were unable to get statistics
log.warn("Unable to get job related diagnostics");
+ } else {
+ throw e;
}
}
}
@@ -128,17 +150,55 @@
return timeSpent;
}
- protected void getErrorMessages(TaskReport reports[], String type, boolean errNotDbg)
- {
- for (int i = 0; i < reports.length; i++) {
+
+ protected void getErrorMessages(TaskReport reports[], String type, boolean errNotDbg, PigContext pigContext) throws Exception
+ {
+ for (int i = 0; i < reports.length; i++) {
String msgs[] = reports[i].getDiagnostics();
- for (int j = 0; j < msgs.length; j++) {
- if (errNotDbg) {
- log.error("Error message from task (" + type + ") " +
- reports[i].getTaskID() + msgs[j]);
+ ArrayList<Exception> exceptions = new ArrayList<Exception>();
+ boolean jobFailed = false;
+ float successfulProgress = 1.0f;
+ if (msgs.length > 0) {
+ //if the progress reported is not 1.0f then the map or reduce job failed
+ //this comparison is in place till Hadoop 0.20 provides methods to query
+ //job status
+ if(reports[i].getProgress() != successfulProgress) {
+ jobFailed = true;
+ }
+ Set<String> errorMessageSet = new HashSet<String>();
+ for (int j = 0; j < msgs.length; j++) {
+ if(!errorMessageSet.contains(msgs[j])) {
+ errorMessageSet.add(msgs[j]);
+ if (errNotDbg) {
+ //errNotDbg is used only for failed jobs
+ //keep track of all the unique exceptions
+ Exception e = getExceptionFromString(msgs[j]);
+ exceptions.add(e);
+ } else {
+ log.debug("Error message from task (" + type + ") " +
+ reports[i].getTaskID() + msgs[j]);
+ }
+ }
+ }
+ }
+
+ //if its a failed job then check if there is more than one exception
+ //more than one exception implies possibly different kinds of failures
+ //log all the different failures and throw the exception corresponding
+ //to the first failure
+ if(jobFailed) {
+ if(exceptions.size() > 1) {
+ for(int j = 0; j < exceptions.size(); ++j) {
+ String headerMessage = "Error message from task (" + type + ") " + reports[i].getTaskID();
+ Utils.writeLog(exceptions.get(j), pigContext.getProperties().getProperty("pig.logfile"), log, false, headerMessage, false, false);
+ }
+ throw exceptions.get(0);
+ } else if(exceptions.size() == 1) {
+ throw exceptions.get(0);
} else {
- log.debug("Error message from task (" + type + ") " +
- reports[i].getTaskID() + msgs[j]);
+ int errCode = 2115;
+ String msg = "Internal error. Expected to throw exception from the backend. Did not find any exception to throw.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
@@ -190,4 +250,271 @@
public long getTotalHadoopTimeSpent() {
return totalHadoopTimeSpent;
}
+
+ /**
+ *
+ * @param stackTraceLine The string representation of {@link Throwable#printStackTrace() printStackTrace}
+ * Handles internal PigException and its subclasses that override the {@link Throwable#toString() toString} method
+ * @return An exception object whose string representation of printStackTrace is the input stackTrace
+ * @throws Exception
+ */
+ Exception getExceptionFromString(String stackTrace) throws Exception{
+ String[] lines = stackTrace.split(newLine);
+ Throwable t = getExceptionFromStrings(lines, 0);
+
+ if(!pigException) {
+ int errCode = 6015;
+ String msg = "During execution, encountered a Hadoop error.";
+ ExecException ee = new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, t);
+ ee.setStackTrace(t.getStackTrace());
+ return ee;
+ } else {
+ pigException = false;
+ if(outOfMemory) {
+ outOfMemory = false;
+ int errCode = 6016;
+ String msg = "Out of memory.";
+ ExecException ee = new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, t);
+ ee.setStackTrace(t.getStackTrace());
+ return ee;
+ }
+ return (Exception)t;
+ }
+ }
+
+ /**
+ *
+ * @param stackTraceLine An array of strings that represent {@link Throwable#printStackTrace() printStackTrace}
+ * output, split by newline
+ * @return An exception object whose string representation of printStackTrace is the input stackTrace
+ * @throws Exception
+ */
+ private Throwable getExceptionFromStrings(String[] stackTraceLines, int startingLineNum) throws Exception{
+ /*
+ * parse the array of string and throw the appropriate exception
+ * first: from the line startingLineNum extract the exception name extract the message if any
+ * fourth: create the appropriate exception and return it
+ * An example of the stack trace:
+ org.apache.pig.backend.executionengine.ExecException: ERROR 1075: Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int.
+ at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.getNext(POCast.java:152)
+ at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr.getNext(LessThanExpr.java:85)
+ at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter.getNext(POFilter.java:148)
+ at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:184)
+ at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:174)
+ at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+ at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47)
+ at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
+ at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
+ */
+
+ int prevStartingLineNum = startingLineNum;
+
+ if(stackTraceLines.length > 0 && startingLineNum < (stackTraceLines.length - 1)) {
+
+ //the regex for matching the exception class name; note the use of the $ for matching nested classes
+ String exceptionNameDelimiter = "(\\w+(\\$\\w+)?\\.)+\\w+";
+ Pattern exceptionNamePattern = Pattern.compile(exceptionNameDelimiter);
+
+ //from the first line extract the exception name and the exception message
+ Matcher exceptionNameMatcher = exceptionNamePattern.matcher(stackTraceLines[startingLineNum]);
+ String exceptionName = null;
+ String exceptionMessage = null;
+ if (exceptionNameMatcher.find()) {
+ exceptionName = exceptionNameMatcher.group();
+ /*
+ * note that the substring is from end + 2
+ * the regex matcher ends at one position beyond the match
+ * in this case it will end at colon (:)
+ * the exception message will have a preceding space (after the colon (:))
+ */
+ exceptionMessage = stackTraceLines[startingLineNum].substring(exceptionNameMatcher.end() + 2);
+ ++startingLineNum;
+ if (exceptionName.contains(OOM_ERR)) {
+ outOfMemory = true;
+ }
+ }
+
+ //the exceptionName should not be null
+ if(exceptionName != null) {
+
+ ArrayList<StackTraceElement> stackTraceElements = new ArrayList<StackTraceElement>();
+
+ //Create stack trace elements for the remaining lines
+ String stackElementRegex = "\\s+at\\s+(\\w+(\\$\\w+)?\\.)+(\\<)?\\w+(\\>)?";
+ Pattern stackElementPattern = Pattern.compile(stackElementRegex);
+ String pigExceptionRegex = "org\\.apache\\.pig\\.";
+ Pattern pigExceptionPattern = Pattern.compile(pigExceptionRegex);
+
+
+ String pigPackageRegex = "org.apache.pig";
+
+ int lineNum = startingLineNum;
+ for(; lineNum < (stackTraceLines.length - 1); ++lineNum) {
+ Matcher stackElementMatcher = stackElementPattern.matcher(stackTraceLines[lineNum]);
+
+ if(stackElementMatcher.find()) {
+ StackTraceElement ste = getStackTraceElement(stackTraceLines[lineNum]);
+ stackTraceElements.add(ste);
+ String className = ste.getClassName();
+ Matcher pigExceptionMatcher = pigExceptionPattern.matcher(className);
+ if(pigExceptionMatcher.find()) {
+ pigException = true;
+ }
+ } else {
+ break;
+ }
+ }
+
+ startingLineNum = lineNum;
+
+ //create the appropriate exception; setup the stack trace and message
+ Object object = PigContext.instantiateFuncFromSpec(exceptionName);
+
+ if(object instanceof PigException) {
+ //extract the error code and message the regex for matching the custom format of ERROR <ERROR CODE>:
+ String errMessageRegex = "ERROR\\s+\\d+:";
+ Pattern errMessagePattern = Pattern.compile(errMessageRegex);
+ Matcher errMessageMatcher = errMessagePattern.matcher(exceptionMessage);
+
+ if(errMessageMatcher.find()) {
+ String errMessageStub = errMessageMatcher.group();
+ /*
+ * extract the actual exception message sans the ERROR <ERROR CODE>:
+ * again note that the matcher ends at the space following the colon (:)
+ * the exception message appears after the space and hence the end + 1
+ */
+ exceptionMessage = exceptionMessage.substring(errMessageMatcher.end() + 1);
+
+ //the regex to match the error code wich is a string of numerals
+ String errCodeRegex = "\\d+";
+ Pattern errCodePattern = Pattern.compile(errCodeRegex);
+ Matcher errCodeMatcher = errCodePattern.matcher(errMessageStub);
+
+ String code = null;
+ if(errCodeMatcher.find()) {
+ code = errCodeMatcher.group();
+ }
+
+ //could receive a number format exception here but it will be propagated up the stack
+ int errCode = Integer.parseInt(code);
+
+ //create the exception with the message and then set the error code and error source
+ FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);
+ object = PigContext.instantiateFuncFromSpec(funcSpec);
+ ((PigException)object).setErrorCode(errCode);
+ ((PigException)object).setErrorSource(PigException.determineErrorSource(errCode));
+ } else { //else for if(errMessageMatcher.find())
+ /*
+ * did not find the error code which means that the PigException or its
+ * subclass is not returning the error code
+ * highly unlikely: should never be here
+ */
+ FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);
+ object = PigContext.instantiateFuncFromSpec(funcSpec);
+ ((PigException)object).setErrorCode(2997);//generic error code
+ ((PigException)object).setErrorSource(PigException.BUG);
+ }
+ } else { //else for if(object instanceof PigException)
+ //its not PigException; create the exception with the message
+ object = PigContext.instantiateFuncFromSpec(exceptionName + "(" + exceptionMessage + ")");
+ }
+
+ StackTraceElement[] steArr = new StackTraceElement[stackTraceElements.size()];
+ ((Throwable)object).setStackTrace((StackTraceElement[])(stackTraceElements.toArray(steArr)));
+
+ if(startingLineNum < (stackTraceLines.length - 1)) {
+ Throwable e = getExceptionFromStrings(stackTraceLines, startingLineNum);
+ ((Throwable)object).initCause(e);
+ }
+
+ return (Throwable)object;
+ } else { //else for if(exceptionName != null)
+ int errCode = 2055;
+ String msg = "Did not find exception name to create exception from string: " + stackTraceLines.toString();
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ } else { //else for if(lines.length > 0)
+ int errCode = 2056;
+ String msg = "Cannot create exception from empty string.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ /**
+ *
+ * @param line the string representation of a stack trace returned by {@link Throwable#printStackTrace() printStackTrace}
+ * @return the StackTraceElement object representing the stack trace
+ * @throws Exception
+ */
+ public StackTraceElement getStackTraceElement(String line) throws Exception{
+ /*
+ * the format of the line is something like:
+ * at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+ * note the white space before the 'at'. Its not of much importance but noted for posterity.
+ */
+ String[] items;
+
+ /*
+ * regex for matching the fully qualified method Name
+ * note the use of the $ for matching nested classes
+ * and the use of < and > for constructors
+ */
+ String qualifiedMethodNameRegex = "(\\w+(\\$\\w+)?\\.)+(<)?\\w+(>)?";
+ Pattern qualifiedMethodNamePattern = Pattern.compile(qualifiedMethodNameRegex);
+ Matcher contentMatcher = qualifiedMethodNamePattern.matcher(line);
+
+ //org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+ String content = null;
+ if(contentMatcher.find()) {
+ content = line.substring(contentMatcher.start());
+ } else {
+ int errCode = 2057;
+ String msg = "Did not find fully qualified method name to reconstruct stack trace: " + line;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ Matcher qualifiedMethodNameMatcher = qualifiedMethodNamePattern.matcher(content);
+
+ //org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map
+ String qualifiedMethodName = null;
+ //(PigMapOnly.java:65)
+ String fileDetails = null;
+
+ if(qualifiedMethodNameMatcher.find()) {
+ qualifiedMethodName = qualifiedMethodNameMatcher.group();
+ fileDetails = content.substring(qualifiedMethodNameMatcher.end() + 1);
+ } else {
+ int errCode = 2057;
+ String msg = "Did not find fully qualified method name to reconstruct stack trace: " + line;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ //From the fully qualified method name, extract the declaring class and method name
+ items = qualifiedMethodName.split("\\.");
+
+ //initialize the declaringClass (to org in most cases)
+ String declaringClass = items[0];
+ //the last member is always the method name
+ String methodName = items[items.length - 1];
+
+ //concatenate the names by adding the dot (.) between the members till the penultimate member
+ for(int i = 1; i < items.length - 1; ++i) {
+ declaringClass += ".";
+ declaringClass += items[i];
+ }
+
+ //from the file details extract the file name and the line number
+ //PigMapOnly.java:65
+ fileDetails = fileDetails.substring(0, fileDetails.length() - 1);
+ items = fileDetails.split(":");
+ //PigMapOnly.java
+ String fileName = null;
+ int lineNumber = 0;
+ if(items.length > 0) {
+ fileName = items[0];
+ lineNumber = Integer.parseInt(items[1]);
+ }
+ return new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Fri Feb 13 01:59:27 2009
@@ -56,7 +56,7 @@
String grpName,
PigContext pc) throws PlanException, VisitorException,
IOException, ExecException,
- JobCreationException {
+ JobCreationException, Exception {
long sleepTime = 500;
MROperPlan mrp = compile(php, pc);
@@ -95,7 +95,7 @@
log.error("Map reduce job failed");
for (Job fj : failedJobs) {
log.error(fj.getMessage());
- getStats(fj, jobClient, true);
+ getStats(fj, jobClient, true, pc);
}
jc.stop();
return false;
@@ -104,7 +104,7 @@
List<Job> succJobs = jc.getSuccessfulJobs();
if(succJobs!=null)
for(Job job : succJobs){
- getStats(job,jobClient, false);
+ getStats(job,jobClient, false, pc);
}
jc.stop();
@@ -166,7 +166,7 @@
//A purely testing method. Not to be used elsewhere
public boolean launchPigWithCombinePlan(PhysicalPlan php,
String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
- VisitorException, IOException, ExecException, JobCreationException {
+ VisitorException, IOException, ExecException, JobCreationException, Exception {
long sleepTime = 500;
MRCompiler comp = new MRCompiler(php, pc);
comp.compile();
@@ -212,13 +212,13 @@
throw new ExecException(
"Something terribly wrong with Job Control.");
for (Job job : failedJobs) {
- getStats(job, jobClient, true);
+ getStats(job, jobClient, true, pc);
}
}
List<Job> succJobs = jc.getSuccessfulJobs();
if (succJobs != null)
for (Job job : succJobs) {
- getStats(job, jobClient, false);
+ getStats(job, jobClient, false, pc);
}
jc.stop();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 13 01:59:27 2009
@@ -871,9 +871,9 @@
curMROp.setFragment(op.getFragment());
curMROp.setReplFiles(op.getReplFiles());
}catch(Exception e){
- VisitorException pe = new VisitorException(e.getMessage());
- pe.initCause(e);
- throw pe;
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
@@ -958,7 +958,7 @@
}
}
- private int[] getSortCols(POSort sort) throws PlanException {
+ private int[] getSortCols(POSort sort) throws PlanException, ExecException {
List<PhysicalPlan> plans = sort.getSortPlans();
if(plans!=null){
int[] ret = new int[plans.size()];
@@ -1384,29 +1384,35 @@
break;
}
- // if input to project is the last input
- if (proj.getColumn() == pack.getNumInps())
- {
- // if we had already seen another project
- // which was also for the last input, then
- // we might be trying to flatten twice on the
- // last input in which case we can't optimize by
- // just streaming the tuple to those projects
- // IMPORTANT NOTE: THIS WILL NEED TO CHANGE WHEN WE
- // OPTIMIZE BUILTINS LIKE SUM() AND COUNT() TO
- // TAKE IN STREAMING INPUT
- if(projOfLastInput != null) {
- allSimple = false;
- break;
- }
- projOfLastInput = proj;
- // make sure the project is on a bag which needs to be
- // flattened
- if (!flatten.get(i) || proj.getResultType() != DataType.BAG)
+ try {
+ // if input to project is the last input
+ if (proj.getColumn() == pack.getNumInps())
{
- lastInputFlattened = false;
- break;
+ // if we had already seen another project
+ // which was also for the last input, then
+ // we might be trying to flatten twice on the
+ // last input in which case we can't optimize by
+ // just streaming the tuple to those projects
+ // IMPORTANT NOTE: THIS WILL NEED TO CHANGE WHEN WE
+ // OPTIMIZE BUILTINS LIKE SUM() AND COUNT() TO
+ // TAKE IN STREAMING INPUT
+ if(projOfLastInput != null) {
+ allSimple = false;
+ break;
+ }
+ projOfLastInput = proj;
+ // make sure the project is on a bag which needs to be
+ // flattened
+ if (!flatten.get(i) || proj.getResultType() != DataType.BAG)
+ {
+ lastInputFlattened = false;
+ break;
+ }
}
+ } catch (ExecException e) {
+ int errCode = 2069;
+ String msg = "Error during map reduce compilation. Problem in accessing column from project operator.";
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
// if all deeper operators are all project
@@ -1577,14 +1583,14 @@
if (mpLeaves.size() != 1) {
int errCode = 2024;
String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
- throw new PigException(msg, errCode, PigException.BUG);
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
PhysicalOperator mpLeaf = mpLeaves.get(0);
if (!(mpLeaf instanceof POStore)) {
int errCode = 2025;
String msg = "Expected leaf of reduce plan to " +
"always be POStore. Found " + mpLeaf.getClass().getSimpleName();
- throw new PigException(msg, errCode, PigException.BUG);
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 13 01:59:27 2009
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
@@ -27,6 +28,7 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -49,6 +51,10 @@
*/
public class MapReduceLauncher extends Launcher{
private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
+
+ //used to track the exception thrown by the job control which is run in a separate thread
+ private Exception jobControlException = null;
+
@Override
public boolean launchPig(PhysicalPlan php,
String grpName,
@@ -56,7 +62,8 @@
VisitorException,
IOException,
ExecException,
- JobCreationException {
+ JobCreationException,
+ Exception {
long sleepTime = 5000;
MROperPlan mrp = compile(php, pc);
@@ -71,7 +78,12 @@
int numMRJobs = jc.getWaitingJobs().size();
- new Thread(jc).start();
+ //create the exception handler for the job control thread
+ //and register the handler with the job control thread
+ JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+ Thread jcThread = new Thread(jc);
+ jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+ jcThread.start();
double lastProg = -1;
int perCom = 0;
@@ -87,13 +99,26 @@
}
lastProg = prog;
}
+
+ //check for the jobControlException first
+ //if the job controller fails before launching the jobs then there are
+ //no jobs to check for failure
+ if(jobControlException != null) {
+ if(jobControlException instanceof PigException) {
+ throw jobControlException;
+ } else {
+ int errCode = 2117;
+ String msg = "Unexpected error when launching map reduce job.";
+ throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+ }
+ }
+
// Look to see if any jobs failed. If so, we need to report that.
List<Job> failedJobs = jc.getFailedJobs();
if (failedJobs != null && failedJobs.size() > 0) {
log.error("Map reduce job failed");
for (Job fj : failedJobs) {
- log.error(fj.getMessage());
- getStats(fj, jobClient, true);
+ getStats(fj, jobClient, true, pc);
}
jc.stop();
return false;
@@ -102,7 +127,7 @@
List<Job> succJobs = jc.getSuccessfulJobs();
if(succJobs!=null)
for(Job job : succJobs){
- getStats(job,jobClient, false);
+ getStats(job,jobClient, false, pc);
}
jc.stop();
@@ -160,5 +185,27 @@
kdv.visit();
return plan;
}
+
+ /**
+ * An exception handler class to handle exceptions thrown by the job controller thread
+ * Its a local class. This is the only mechanism to catch unhandled thread exceptions
+ * Unhandled exceptions in threads are handled by the VM if the handler is not registered
+ * explicitly or if the default handler is null
+ */
+ class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ throwable.printStackTrace(ps);
+ String exceptionString = baos.toString();
+ try {
+ jobControlException = getExceptionFromString(exceptionString);
+ } catch (Exception e) {
+ String errMsg = "Could not resolve error that occured when launching map reduce job.";
+ jobControlException = new RuntimeException(errMsg, e);
+ }
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 13 01:59:27 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -109,9 +110,9 @@
roots = cp.getRoots().toArray(new PhysicalOperator[1]);
leaf = cp.getLeaves().get(0);
}
- } catch (IOException e) {
- log.error(e.getMessage() + "was caused by:");
- log.error(e.getCause().getMessage());
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring combiner's reduce plan.";
+ throw new RuntimeException(msg, ioe);
}
}
@@ -195,13 +196,13 @@
continue;
if(redRes.returnStatus==POStatus.STATUS_ERR){
+ int errCode = 2090;
String msg = "Received Error while " +
"processing the combine plan.";
if(redRes.result != null) {
msg += redRes.result;
}
- IOException ioe = new IOException(msg);
- throw ioe;
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
@@ -210,8 +211,9 @@
return false;
if(res.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Packaging error while processing group");
- throw ioe;
+ int errCode = 2091;
+ String msg = "Packaging error while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -221,9 +223,7 @@
return false;
} catch (ExecException e) {
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(e.getCause());
- throw ioe;
+ throw e;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 13 01:59:27 2009
@@ -39,9 +39,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.Slice;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.PigSlicer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
@@ -100,7 +102,9 @@
protected Path[] listPaths(JobConf job) throws IOException {
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
+ int errCode = 2092;
+ String msg = "No input paths specified in job.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
List<Path> result = new ArrayList<Path>();
@@ -176,37 +180,54 @@
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- ArrayList<Pair<FileSpec, Boolean>> inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
- .deserialize(job.get("pig.inputs"));
- ArrayList<ArrayList<OperatorKey>> inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
- .deserialize(job.get("pig.inpTargets"));
- PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job
- .get("pig.pigContext"));
+ ArrayList<Pair<FileSpec, Boolean>> inputs;
+ ArrayList<ArrayList<OperatorKey>> inpTargets;
+ PigContext pigContext;
+ try {
+ inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
+ .deserialize(job.get("pig.inputs"));
+ inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
+ .deserialize(job.get("pig.inpTargets"));
+ pigContext = (PigContext) ObjectSerializer.deserialize(job
+ .get("pig.pigContext"));
+ } catch (Exception e) {
+ int errCode = 2094;
+ String msg = "Unable to deserialize object.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < inputs.size(); i++) {
- Path path = new Path(inputs.get(i).first.getFileName());
- FileSystem fs = path.getFileSystem(job);
- // if the execution is against Mapred DFS, set
- // working dir to /user/<userid>
- if(pigContext.getExecType() == ExecType.MAPREDUCE)
- fs.setWorkingDirectory(new Path("/user", job.getUser()));
-
- DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
- ValidatingInputFileSpec spec;
- if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
- spec = (ValidatingInputFileSpec) inputs.get(i).first;
- } else {
- spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
- }
- boolean isSplittable = inputs.get(i).second;
- if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
- ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
- }
- Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
- for (Slice split : pigs) {
- splits.add(new SliceWrapper(split, pigContext, i, fs, inpTargets.get(i)));
- }
+ try {
+ Path path = new Path(inputs.get(i).first.getFileName());
+ FileSystem fs = path.getFileSystem(job);
+ // if the execution is against Mapred DFS, set
+ // working dir to /user/<userid>
+ if(pigContext.getExecType() == ExecType.MAPREDUCE)
+ fs.setWorkingDirectory(new Path("/user", job.getUser()));
+
+ DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
+ ValidatingInputFileSpec spec;
+ if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
+ spec = (ValidatingInputFileSpec) inputs.get(i).first;
+ } else {
+ spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
+ }
+ boolean isSplittable = inputs.get(i).second;
+ if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+ ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
+ }
+ Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
+ for (Slice split : pigs) {
+ splits.add(new SliceWrapper(split, pigContext, i, fs, inpTargets.get(i)));
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2118;
+ String msg = "Unable to create input slice for: " + inputs.get(i).first.getFileName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
}
return splits.toArray(new SliceWrapper[splits.size()]);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Feb 13 01:59:27 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
@@ -96,9 +97,7 @@
try {
runPipeline(leaf);
} catch (ExecException e) {
- IOException ioe = new IOException("Error running pipeline in close() of map");
- ioe.initCause(e);
- throw ioe;
+ throw e;
}
}
@@ -151,9 +150,9 @@
roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
leaf = mp.getLeaves().get(0);
}
- } catch (IOException e) {
- log.error(e.getMessage() + "was caused by:");
- log.error(e.getCause().getMessage());
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring map plan.";
+ throw new RuntimeException(msg, ioe);
}
}
@@ -183,9 +182,7 @@
try{
collect(oc,inpTuple);
} catch (ExecException e) {
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(e.getCause());
- throw ioe;
+ throw e;
}
return;
}
@@ -197,9 +194,7 @@
runPipeline(leaf);
} catch (ExecException e) {
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(e.getCause());
- throw ioe;
+ throw e;
}
}
@@ -232,8 +227,9 @@
"processing the map plan.";
}
- IOException ioe = new IOException(errMsg);
- throw ioe;
+ int errCode = 2055;
+ ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+ throw ee;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Feb 13 01:59:27 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -188,9 +189,9 @@
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
leaf = rp.getLeaves().get(0);
}
- } catch (IOException e) {
- log.error(e.getMessage() + "was caused by:");
- log.error(e.getCause().getMessage());
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring reduce plan.";
+ throw new RuntimeException(msg, ioe);
}
}
@@ -258,8 +259,9 @@
}
if(res.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Packaging error while processing group");
- throw ioe;
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -268,9 +270,7 @@
return false;
} catch (ExecException e) {
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(e.getCause());
- throw ioe;
+ throw e;
}
}
@@ -300,17 +300,16 @@
// close() we can do the right thing
errorInReduce = true;
// if there is an errmessage use it
- String errMsg;
+ String msg;
if(redRes.result != null) {
- errMsg = "Received Error while " +
+ msg = "Received Error while " +
"processing the reduce plan: " + redRes.result;
} else {
- errMsg = "Received Error while " +
+ msg = "Received Error while " +
"processing the reduce plan.";
}
-
- IOException ioe = new IOException(errMsg);
- throw ioe;
+ int errCode = 2090;
+ throw new ExecException(msg, errCode, PigException.BUG);
}
}
@@ -344,9 +343,7 @@
try {
runPipeline(leaf);
} catch (ExecException e) {
- IOException ioe = new IOException("Error running pipeline in close() of reduce");
- ioe.initCause(e);
- throw ioe;
+ throw e;
}
}
@@ -410,7 +407,7 @@
try {
key = HDataType.getWritableComparableTypes(t.get(0), keyType);
} catch (ExecException e) {
- throw WrappedIOException.wrap(e);
+ throw e;
}
}
@@ -440,15 +437,14 @@
}
if(res.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Packaging error while processing group");
- throw ioe;
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
} catch (ExecException e) {
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(e.getCause());
- throw ioe;
+ throw e;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Feb 13 01:59:27 2009
@@ -30,7 +30,9 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
+import org.apache.pig.PigException;
import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -63,11 +65,9 @@
store = (StoreFunc) PigContext
.instantiateFuncFromSpec(storeFunc);
} catch (Exception e) {
- RuntimeException re = new RuntimeException(e.getClass()
- .getName()
- + ": " + e.getMessage());
- re.setStackTrace(e.getStackTrace());
- throw re;
+ int errCode = 2081;
+ String msg = "Unable to setup the store function.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri Feb 13 01:59:27 2009
@@ -54,9 +54,8 @@
mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
- mLog.error("Unable to deserialize pig.sortOrder " +
- ioe.getMessage());
- throw new RuntimeException(ioe);
+ String msg = "Unable to deserialize pig.sortOrder";
+ throw new RuntimeException(msg, ioe);
}
if (mAsc == null) {
mAsc = new boolean[1];
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Fri Feb 13 01:59:27 2009
@@ -40,8 +40,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
import org.apache.pig.Slice;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.PigSlice;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
@@ -183,8 +185,9 @@
try {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
- IOException newE = wrapException(cnfe);
- throw newE;
+ int errCode = 2094;
+ String msg = "Unable to deserialize object.";
+ throw new ExecException(msg, errCode, PigException.BUG, cnfe);
}
}