You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/02/13 02:59:30 UTC

svn commit: r743952 [1/4] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/executionengine/ sr...

Author: olga
Date: Fri Feb 13 01:59:27 2009
New Revision: 743952

URL: http://svn.apache.org/viewvc?rev=743952&view=rev
Log:
 PIG-590: error handling on the backend (sms via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
    hadoop/pig/trunk/src/org/apache/pig/PigException.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/ARITY.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BagSize.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/CONCAT.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/ConstantSize.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DIFF.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MapSize.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/SIZE.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringConcat.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringSize.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/TOKENIZE.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/TupleSize.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPOCogroup.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 13 01:59:27 2009
@@ -419,3 +419,6 @@
 
     PIG-665: Map key type not correctly set (for use when key is null) when
     map plan does not have localrearrange (pradeepkth)
+
+    PIG-590: error handling on the backend (sms via olgan)
+

Modified: hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java Fri Feb 13 01:59:27 2009
@@ -38,6 +38,16 @@
     
     /**
      * @param className the name of the class for the udf
+     * @param ctorArg the argument for the constructor for the above class
+     */
+    public FuncSpec(String className, String ctorArg) {
+        this.className = className;
+        this.ctorArgs = new String[1];
+        this.ctorArgs[0] = ctorArg;
+    }
+
+    /**
+     * @param className the name of the class for the udf
      * @param ctorArgs the arguments for the constructor for the above class
      */
     public FuncSpec(String className, String[] ctorArgs) {

Modified: hadoop/pig/trunk/src/org/apache/pig/PigException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigException.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigException.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigException.java Fri Feb 13 01:59:27 2009
@@ -39,6 +39,7 @@
     public static final byte BUG = 4;
     public static final byte USER_ENVIRONMENT = 8;
     public static final byte REMOTE_ENVIRONMENT = 16;
+    public static final byte ERROR = -1;
 
     /**
      * A static method to query if an error source is due to
@@ -84,6 +85,25 @@
         return ((errSource & REMOTE_ENVIRONMENT) == 0 ? false : true);
     }
     
+    /**
+     * A static method to determine the error source given the error code
+     * 
+     *  @param errCode - integer error code
+     *  @return byte that indicates the error source
+     */
+    public static byte determineErrorSource(int errCode) {
+    	if(errCode >= 100 && errCode <= 1999) {
+    		return PigException.INPUT;
+    	} else if (errCode >= 2000 && errCode <= 2999) {
+    		return PigException.BUG;
+    	} else if (errCode >= 3000 && errCode <= 4999) {
+    		return PigException.USER_ENVIRONMENT;
+    	} else if (errCode >= 5000 && errCode <= 6999) {
+    		return PigException.REMOTE_ENVIRONMENT;
+    	}
+    	return PigException.ERROR;
+    }
+    
     protected int errorCode = 0;
     protected byte errorSource = BUG;
     protected boolean retriable = false;
@@ -292,4 +312,27 @@
         detailedMessage = detailMsg;
     }
 
+    /**
+     * Returns a short description of this throwable.
+     * The result is the concatenation of:
+     * <ul>
+     * <li> the {@linkplain Class#getName() name} of the class of this object
+     * <li> ": " (a colon and a space)
+     * <li> "ERROR " (the string ERROR followed by a a space)
+     * <li> the result of invoking this object's {@link #getErrorCode} method
+     * <li> ": " (a colon and a space)
+     * <li> the result of invoking {@link Throwable#getLocalizedMessage() getLocalizedMessage}
+     *      method
+     * </ul>
+     * If <tt>getLocalizedMessage</tt> returns <tt>null</tt>, then just
+     * the class name is returned.
+     *
+     * @return a string representation of this throwable.
+     */
+    @Override
+    public String toString() {
+        String s = getClass().getName();
+        String message = getLocalizedMessage();
+        return (message != null) ? (s + ": " + "ERROR " + getErrorCode() + ": " + message) : s;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb 13 01:59:27 2009
@@ -416,6 +416,7 @@
 //            ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
             // invocation of "execute" is synchronous!
+
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
                     return job.getResults();
             } else {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java Fri Feb 13 01:59:27 2009
@@ -29,6 +29,7 @@
 import java.util.zip.GZIPInputStream;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
 import org.apache.pig.Slice;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -75,7 +76,9 @@
             try {
                 loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
             } catch (Exception exp) {
-                throw new RuntimeException("can't instantiate " + parser);
+                int errCode = 2081;
+                String msg = "Unable to set up the load function.";
+                throw new ExecException(msg, errCode, PigException.BUG, exp);
             }
         }
         fsis = base.asElement(base.getActiveContainer(), file).sopen();
@@ -152,9 +155,9 @@
         try {
             return ois.readObject();
         } catch (ClassNotFoundException cnfe) {
-            IOException newE = new IOException(cnfe.getMessage());
-            newE.initCause(cnfe);
-            throw newE;
+            int errCode = 2094;
+            String msg = "Unable to deserialize object.";
+            throw new ExecException(msg, errCode, PigException.BUG, cnfe);
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Fri Feb 13 01:59:27 2009
@@ -23,6 +23,7 @@
 import java.util.Map;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
 import org.apache.pig.Slice;
 import org.apache.pig.Slicer;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -79,8 +80,10 @@
                     }
                     continue;
                 }
-            } catch (Exception e) { 
-                throw WrappedIOException.wrap(e);
+            } catch (Exception e) {
+                int errCode = 2099;
+                String msg = "Problem in constructing slices.";
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
             Map<String, Object> stats = fullPath.getStatistics();
             long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
@@ -112,7 +115,9 @@
 
     public void validate(DataStorage store, String location) throws IOException {
         if (!FileLocalizer.fileExists(location, store)) {
-            throw new IOException(store.asElement(location) + " does not exist");
+            int errCode = 2100;
+            String msg = store.asElement(location) + " does not exist.";
+            throw new ExecException(msg, errCode, PigException.BUG);
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Fri Feb 13 01:59:27 2009
@@ -94,8 +94,11 @@
         case DataType.TUPLE:
             return new NullableTuple((Tuple)o);
          
-        case DataType.MAP:
-            throw new RuntimeException("Map not supported as a key type!");
+        case DataType.MAP: {
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new ExecException(msg, errCode, PigException.INPUT);
+        }
 
         case DataType.NULL:
             switch (keyType) {
@@ -135,8 +138,12 @@
                 NullableTuple ntuple = new NullableTuple();
                 ntuple.setNull(true);
                 return ntuple;
-            case DataType.MAP:
-                throw new RuntimeException("Map not supported as a key type!");
+            case DataType.MAP: {
+                int errCode = 1068;
+                String msg = "Using Map as key not supported.";
+                throw new ExecException(msg, errCode, PigException.INPUT);
+            }
+            
             }
             break;
         default:
@@ -182,8 +189,11 @@
         case DataType.TUPLE:
             wcKey = defTup;
             break;
-        case DataType.MAP:
-            throw new RuntimeException("Map not supported as a key type!");
+        case DataType.MAP: {
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new ExecException(msg, errCode, PigException.INPUT);
+        }
         default:
             if (typeToName == null) typeToName = DataType.genTypeToNameMap();
             int errCode = 2044;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Fri Feb 13 01:59:27 2009
@@ -31,7 +31,9 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.*;
+import org.apache.pig.backend.executionengine.ExecException;
 
 public abstract class HPath implements ElementDescriptor {
 
@@ -89,8 +91,10 @@
                                        new Configuration());
         
         if (!result) {
-            throw new IOException("Failed to copy from: " + this.toString() +
-                                  " to: " + dstName.toString());
+            int errCode = 2097;
+            String msg = "Failed to copy from: " + this.toString() +
+            " to: " + dstName.toString();
+            throw new ExecException(msg, errCode, PigException.BUG);
         }
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Fri Feb 13 01:59:27 2009
@@ -22,7 +22,9 @@
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
+import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
 
 public class HSeekableInputStream extends SeekableInputStream {
 
@@ -53,7 +55,9 @@
             break;
         }
         default: {
-            throw new IOException("Invalid seek option: " + whence);
+            int errCode = 2098;
+            String msg = "Invalid seek option: " + whence;
+            throw new ExecException(msg, errCode, PigException.BUG);
         }
         }
         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Feb 13 01:59:27 2009
@@ -261,7 +261,7 @@
             if (e instanceof ExecException) throw (ExecException)e;
             else {
                 int errCode = 2043;
-                String msg = "Error during execution.";
+                String msg = "Unexpected error during execution.";
                 throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Feb 13 01:59:27 2009
@@ -32,6 +32,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
@@ -71,7 +72,9 @@
              p.bindTo(outFileSpec.getFileName(), new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
 
         }catch (Exception e){
-            throw new ExecException("Unable to get results for " + outFileSpec, e);
+            int errCode = 2088;
+            String msg = "Unable to get results for: " + outFileSpec;
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
         
         return new Iterator<Tuple>() {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Fri Feb 13 01:59:27 2009
@@ -26,6 +26,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -48,6 +49,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 /**
  * Optimize map reduce plans to use the combiner where possible.
@@ -278,7 +280,7 @@
                 } catch (Exception e) {
                     int errCode = 2018;
                     String msg = "Internal error. Unable to introduce the combiner for optimization.";
-                    throw new VisitorException(msg, errCode, PigException.BUG, e);
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
                 }
             }
         }
@@ -442,7 +444,9 @@
                     try {
                         dp.visit();
                     } catch (VisitorException e) {
-                        throw new PlanException(e);
+                        int errCode = 2073;
+                        String msg = "Problem with replacing distinct operator with distinct built-in function.";
+                        throw new PlanException(msg, errCode, PigException.BUG, e);
                     }
                     
                     
@@ -461,7 +465,13 @@
                     // to type Intermediate in combine plan and to type Final in
                     // the reduce
                     POUserFunc distinctFunc = (POUserFunc)getDistinctUserFunc(plans[j], leaf);
-                    distinctFunc.setAlgebraicFunction(funcTypes[j]);
+                    try {
+                        distinctFunc.setAlgebraicFunction(funcTypes[j]);
+                    } catch (ExecException e) {
+                        int errCode = 2074;
+                        String msg = "Could not configure distinct's algebraic functions in map reduce plan.";
+                        throw new PlanException(msg, errCode, PigException.BUG, e);
+                    }
                 }
                 
             }
@@ -474,7 +484,9 @@
             try {
                 new fixMapProjects(mpl).visit();
             } catch (VisitorException e) {
-                throw new PlanException(e);
+                int errCode = 2089;
+                String msg = "Unable to flag project operator to use single tuple bag.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
             }
         }
 
@@ -619,7 +631,13 @@
             throw new PlanException(msg, errCode, PigException.BUG);
         }
         POUserFunc func = (POUserFunc)leaf;
-        func.setAlgebraicFunction(type);
+        try {
+            func.setAlgebraicFunction(type);
+        } catch (ExecException e) {
+            int errCode = 2075;
+            String msg = "Could not set algebraic function type.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
     }
 
     private void fixUpRearrange(POLocalRearrange rearrange) throws PlanException {
@@ -802,8 +820,9 @@
                 if(patched) {
                     // we should not already have been patched since the
                     // Project-Distinct pair should occur only once
-                    throw new VisitorException(
-                            "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.");
+                    int errCode = 2076;
+                    String msg = "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
                 }
                 // we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
                 // in place of the Project-PODistinct pair
@@ -824,11 +843,13 @@
                     func.setResultType(DataType.BAG);
                     mPlan.replace(proj, func);
                     mPlan.remove(pred);
-                    // connect the the newly add "func" to
+                    // connect the the newly added "func" to
                     // the predecessor to the earlier PODistinct
                     mPlan.connect(distinctPredecessor, func);
                 } catch (PlanException e) {
-                    throw new VisitorException(e);
+                    int errCode = 2077;
+                    String msg = "Problem with reconfiguring plan to add distinct built-in function.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
                 }
                 patched = true;
             } 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 13 01:59:27 2009
@@ -46,6 +46,7 @@
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -605,7 +606,7 @@
                         int errCode = 6003;
                         String msg = "Invalid cache specification. " +
                         "File doesn't exist: " + src;
-                        throw new PigException(msg, errCode, PigException.USER_ENVIRONMENT);
+                        throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
                     }
                     
                     // Ship it to the cluster if necessary and add to the
@@ -636,7 +637,7 @@
                             }
                             String msg = "Invalid ship specification. " +
                             "File doesn't exist: " + dst;
-                            throw new PigException(msg, errCode, errSrc);
+                            throw new ExecException(msg, errCode, errSrc);
                         }
                         DistributedCache.addCacheFile(dstURI, conf);
                     } else {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri Feb 13 01:59:27 2009
@@ -19,7 +19,12 @@
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +35,8 @@
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
@@ -40,14 +47,23 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.grunt.Utils;
 
 public abstract class Launcher {
     private static final Log log = LogFactory.getLog(Launcher.class);
     
-    long totalHadoopTimeSpent;
+    long totalHadoopTimeSpent;    
+    String newLine = "\n";
+    boolean pigException = false;
+    boolean outOfMemory = false;
+    final String OOM_ERR = "OutOfMemoryError";
     
     protected Launcher(){
         totalHadoopTimeSpent = 0;
+        //handle the windows portion of \r
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+            newLine = "\r\n";
+        }
     }
     /**
      * Method to launch pig for hadoop either for a cluster's
@@ -80,7 +96,7 @@
      */
     public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
-            JobCreationException;
+            JobCreationException, Exception;
 
     /**
      * Explain how a pig job will be executed on the underlying
@@ -102,20 +118,26 @@
         return (int)(Math.ceil(prog)) == (int)1;
     }
     
-    protected void getStats(Job job, JobClient jobClient, boolean errNotDbg) {
+    protected void getStats(Job job, JobClient jobClient, boolean errNotDbg, PigContext pigContext) throws Exception {
         JobID MRJobID = job.getAssignedJobID();
+
+        if(MRJobID == null) {
+        	throw getExceptionFromString(job.getMessage()); 
+        }
         try {
             TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
-            getErrorMessages(mapRep, "map", errNotDbg);
+            getErrorMessages(mapRep, "map", errNotDbg, pigContext);
             totalHadoopTimeSpent += computeTimeSpent(mapRep);
             TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
-            getErrorMessages(redRep, "reduce", errNotDbg);
+            getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
             totalHadoopTimeSpent += computeTimeSpent(mapRep);
         } catch (IOException e) {
             if(job.getState() == Job.SUCCESS) {
                 // if the job succeeded, let the user know that
                 // we were unable to get statistics
                 log.warn("Unable to get job related diagnostics");
+            } else {
+            	throw e;
             }
         }
     }
@@ -128,17 +150,55 @@
         return timeSpent;
     }
     
-    protected void getErrorMessages(TaskReport reports[], String type, boolean errNotDbg)
-    {
-        for (int i = 0; i < reports.length; i++) {
+    
+    protected void getErrorMessages(TaskReport reports[], String type, boolean errNotDbg, PigContext pigContext) throws Exception
+    {        
+    	for (int i = 0; i < reports.length; i++) {
             String msgs[] = reports[i].getDiagnostics();
-            for (int j = 0; j < msgs.length; j++) {
-                if (errNotDbg) {
-                    log.error("Error message from task (" + type + ") " +
-                        reports[i].getTaskID() + msgs[j]);
+            ArrayList<Exception> exceptions = new ArrayList<Exception>();
+            boolean jobFailed = false;
+            float successfulProgress = 1.0f;
+            if (msgs.length > 0) {
+            	//if the progress reported is not 1.0f then the map or reduce job failed
+            	//this comparison is in place till Hadoop 0.20 provides methods to query
+            	//job status            	
+            	if(reports[i].getProgress() != successfulProgress) {
+            		jobFailed = true;
+            	}
+                Set<String> errorMessageSet = new HashSet<String>();
+                for (int j = 0; j < msgs.length; j++) {                	
+	            	if(!errorMessageSet.contains(msgs[j])) {
+	            	    errorMessageSet.add(msgs[j]);
+		            	if (errNotDbg) {
+		            		//errNotDbg is used only for failed jobs
+		            	    //keep track of all the unique exceptions
+			                Exception e = getExceptionFromString(msgs[j]);
+			                exceptions.add(e);
+		                } else {
+		                    log.debug("Error message from task (" + type + ") " +
+		                        reports[i].getTaskID() + msgs[j]);
+		                }
+	            	}
+	            }
+            }
+            
+            //if its a failed job then check if there is more than one exception
+            //more than one exception implies possibly different kinds of failures
+            //log all the different failures and throw the exception corresponding
+            //to the first failure
+            if(jobFailed) {
+                if(exceptions.size() > 1) {
+                    for(int j = 0; j < exceptions.size(); ++j) {
+                        String headerMessage = "Error message from task (" + type + ") " + reports[i].getTaskID();
+                        Utils.writeLog(exceptions.get(j), pigContext.getProperties().getProperty("pig.logfile"), log, false, headerMessage, false, false);
+                    }
+                    throw exceptions.get(0);
+                } else if(exceptions.size() == 1) {
+                	throw exceptions.get(0);
                 } else {
-                    log.debug("Error message from task (" + type + ") " +
-                        reports[i].getTaskID() + msgs[j]);
+                	int errCode = 2115;
+                	String msg = "Internal error. Expected to throw exception from the backend. Did not find any exception to throw.";
+                	throw new ExecException(msg, errCode, PigException.BUG);
                 }
             }
         }
@@ -190,4 +250,271 @@
     public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }
+
+    /**
+     * 
+     * @param stackTraceLine The string representation of {@link Throwable#printStackTrace() printStackTrace}
+     * Handles internal PigException and its subclasses that override the {@link Throwable#toString() toString} method
+     * @return An exception object whose string representation of printStackTrace is the input stackTrace 
+     * @throws Exception
+     */
+    Exception getExceptionFromString(String stackTrace) throws Exception{
+        String[] lines = stackTrace.split(newLine);
+        Throwable t = getExceptionFromStrings(lines, 0);
+        
+        if(!pigException) {
+            int errCode = 6015;
+            String msg = "During execution, encountered a Hadoop error.";
+            ExecException ee = new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, t);
+            ee.setStackTrace(t.getStackTrace());
+            return ee;
+        } else {
+            pigException = false;
+            if(outOfMemory) {
+                outOfMemory = false;
+                int errCode = 6016;
+                String msg = "Out of memory.";
+                ExecException ee = new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, t);
+                ee.setStackTrace(t.getStackTrace());
+                return ee;
+            }
+            return (Exception)t;
+        }
+    }
+
+    /**
+     * 
+     * @param stackTraceLine An array of strings that represent {@link Throwable#printStackTrace() printStackTrace}
+     * output, split by newline
+     * @return An exception object whose string representation of printStackTrace is the input stackTrace 
+     * @throws Exception
+     */
+    private Throwable getExceptionFromStrings(String[] stackTraceLines, int startingLineNum) throws Exception{
+        /*
+         * parse the array of string and throw the appropriate exception
+         * first: from the line startingLineNum extract the exception name extract the message if any
+         * fourth: create the appropriate exception and return it
+         * An example of the stack trace:
+		org.apache.pig.backend.executionengine.ExecException: ERROR 1075: Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int.
+        at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.getNext(POCast.java:152)
+        at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr.getNext(LessThanExpr.java:85)
+        at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter.getNext(POFilter.java:148)
+        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:184)
+        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:174)
+        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47)
+        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
+        at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
+         */
+
+        int prevStartingLineNum = startingLineNum;
+        
+        if(stackTraceLines.length > 0 && startingLineNum < (stackTraceLines.length - 1)) {
+            
+            //the regex for matching the exception class name; note the use of the $ for matching nested classes
+            String exceptionNameDelimiter = "(\\w+(\\$\\w+)?\\.)+\\w+";
+            Pattern exceptionNamePattern = Pattern.compile(exceptionNameDelimiter);
+            
+        	//from the first line extract the exception name and the exception message
+            Matcher exceptionNameMatcher = exceptionNamePattern.matcher(stackTraceLines[startingLineNum]);
+            String exceptionName = null;
+            String exceptionMessage = null;
+            if (exceptionNameMatcher.find()) {
+            	exceptionName = exceptionNameMatcher.group();
+            	/*
+            	 * note that the substring is from end + 2
+            	 * the regex matcher ends at one position beyond the match
+            	 * in this case it will end at colon (:)
+            	 * the exception message will have a preceding space (after the colon (:)) 
+            	 */            	
+            	exceptionMessage = stackTraceLines[startingLineNum].substring(exceptionNameMatcher.end() + 2);
+            	++startingLineNum;
+            	if (exceptionName.contains(OOM_ERR)) {
+            	    outOfMemory = true;
+            	}
+            }
+        	
+            //the exceptionName should not be null
+            if(exceptionName != null) {            	
+
+                ArrayList<StackTraceElement> stackTraceElements = new ArrayList<StackTraceElement>();
+                
+                //Create stack trace elements for the remaining lines
+                String stackElementRegex = "\\s+at\\s+(\\w+(\\$\\w+)?\\.)+(\\<)?\\w+(\\>)?";
+                Pattern stackElementPattern = Pattern.compile(stackElementRegex);
+                String pigExceptionRegex = "org\\.apache\\.pig\\.";
+                Pattern pigExceptionPattern = Pattern.compile(pigExceptionRegex);                
+
+                
+                String pigPackageRegex = "org.apache.pig";
+                
+                int lineNum = startingLineNum;
+                for(; lineNum < (stackTraceLines.length - 1); ++lineNum) {
+                    Matcher stackElementMatcher = stackElementPattern.matcher(stackTraceLines[lineNum]);
+
+                    if(stackElementMatcher.find()) {
+                        StackTraceElement ste = getStackTraceElement(stackTraceLines[lineNum]);
+                        stackTraceElements.add(ste);
+                        String className = ste.getClassName();
+                        Matcher pigExceptionMatcher = pigExceptionPattern.matcher(className);
+                        if(pigExceptionMatcher.find()) {
+                            pigException = true;
+                        }                       
+                    } else {
+                        break;
+                    }
+                }
+                
+                startingLineNum = lineNum;               
+
+            	//create the appropriate exception; setup the stack trace and message
+            	Object object = PigContext.instantiateFuncFromSpec(exceptionName);
+            	
+            	if(object instanceof PigException) {
+            		//extract the error code and message the regex for matching the custom format of ERROR <ERROR CODE>:
+            		String errMessageRegex = "ERROR\\s+\\d+:";
+            		Pattern errMessagePattern = Pattern.compile(errMessageRegex);
+            		Matcher errMessageMatcher = errMessagePattern.matcher(exceptionMessage);
+            		
+            		if(errMessageMatcher.find()) {
+            			String errMessageStub = errMessageMatcher.group();
+            			/*
+            			 * extract the actual exception message sans the ERROR <ERROR CODE>:
+            			 * again note that the matcher ends at the space following the colon (:)
+            			 * the exception message appears after the space and hence the end + 1
+            			 */            			
+            			exceptionMessage = exceptionMessage.substring(errMessageMatcher.end() + 1);
+                		
+            			//the regex to match the error code wich is a string of numerals
+            			String errCodeRegex = "\\d+";
+                		Pattern errCodePattern = Pattern.compile(errCodeRegex);
+                		Matcher errCodeMatcher = errCodePattern.matcher(errMessageStub);
+                		
+                		String code = null;
+                		if(errCodeMatcher.find()) {
+                			code = errCodeMatcher.group();	
+                		}
+            			
+                		//could receive a number format exception here but it will be propagated up the stack                		
+                		int errCode = Integer.parseInt(code);
+                		
+                		//create the exception with the message and then set the error code and error source
+                		FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);		                		
+                		object = PigContext.instantiateFuncFromSpec(funcSpec);
+                		((PigException)object).setErrorCode(errCode);
+                		((PigException)object).setErrorSource(PigException.determineErrorSource(errCode));
+            		} else { //else for if(errMessageMatcher.find())
+            			/*
+            			 * did not find the error code which means that the PigException or its
+            			 * subclass is not returning the error code
+            			 * highly unlikely: should never be here
+            			 */
+            			FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);		                		
+                		object = PigContext.instantiateFuncFromSpec(funcSpec);
+                		((PigException)object).setErrorCode(2997);//generic error code
+                		((PigException)object).setErrorSource(PigException.BUG);		                			
+            		}		                		
+            	} else { //else for if(object instanceof PigException)
+            		//its not PigException; create the exception with the message
+            		object = PigContext.instantiateFuncFromSpec(exceptionName + "(" + exceptionMessage + ")");
+            	}
+            	
+            	StackTraceElement[] steArr = new StackTraceElement[stackTraceElements.size()];
+            	((Throwable)object).setStackTrace((StackTraceElement[])(stackTraceElements.toArray(steArr)));
+            	
+            	if(startingLineNum < (stackTraceLines.length - 1)) {
+            	    Throwable e = getExceptionFromStrings(stackTraceLines, startingLineNum);
+            	    ((Throwable)object).initCause(e);
+            	}
+            	
+            	return (Throwable)object;
+            } else { //else for if(exceptionName != null)
+        		int errCode = 2055;
+        		String msg = "Did not find exception name to create exception from string: " + stackTraceLines.toString();
+        		throw new ExecException(msg, errCode, PigException.BUG);
+            }
+        } else { //else for if(lines.length > 0)
+    		int errCode = 2056;
+    		String msg = "Cannot create exception from empty string.";
+    		throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+    
+    /**
+     * 
+     * @param line the string representation of a stack trace returned by {@link Throwable#printStackTrace() printStackTrace}
+     * @return the StackTraceElement object representing the stack trace
+     * @throws Exception
+     */
+    public StackTraceElement getStackTraceElement(String line) throws Exception{
+    	/*    	
+    	 * the format of the line is something like:
+    	 *     	        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+    	 * note the white space before the 'at'. Its not of much importance but noted for posterity.    
+    	 */
+    	String[] items;
+    	
+    	/*
+    	 * regex for matching the fully qualified method Name
+    	 * note the use of the $ for matching nested classes
+    	 * and the use of < and > for constructors
+    	 */
+    	String qualifiedMethodNameRegex = "(\\w+(\\$\\w+)?\\.)+(<)?\\w+(>)?";
+        Pattern qualifiedMethodNamePattern = Pattern.compile(qualifiedMethodNameRegex);
+        Matcher contentMatcher = qualifiedMethodNamePattern.matcher(line);
+        
+        //org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+    	String content = null;
+        if(contentMatcher.find()) {
+        	content = line.substring(contentMatcher.start());
+        } else {
+    		int errCode = 2057;
+    		String msg = "Did not find fully qualified method name to reconstruct stack trace: " + line;
+    		throw new ExecException(msg, errCode, PigException.BUG);        	
+        }
+        
+        Matcher qualifiedMethodNameMatcher = qualifiedMethodNamePattern.matcher(content);
+        
+        //org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map
+        String qualifiedMethodName = null;
+        //(PigMapOnly.java:65)
+        String fileDetails = null;
+        
+        if(qualifiedMethodNameMatcher.find()) {
+        	qualifiedMethodName = qualifiedMethodNameMatcher.group();
+        	fileDetails = content.substring(qualifiedMethodNameMatcher.end() + 1);
+        } else {
+    		int errCode = 2057;
+    		String msg = "Did not find fully qualified method name to reconstruct stack trace: " + line;
+    		throw new ExecException(msg, errCode, PigException.BUG);        	
+        }
+    	
+        //From the fully qualified method name, extract the declaring class and method name
+        items = qualifiedMethodName.split("\\.");
+        
+        //initialize the declaringClass (to org in most cases)
+        String declaringClass = items[0]; 
+        //the last member is always the method name
+        String methodName = items[items.length - 1];
+        
+        //concatenate the names by adding the dot (.) between the members till the penultimate member
+        for(int i = 1; i < items.length - 1; ++i) {
+        	declaringClass += ".";
+        	declaringClass += items[i];
+        }
+        
+        //from the file details extract the file name and the line number
+        //PigMapOnly.java:65
+        fileDetails = fileDetails.substring(0, fileDetails.length() - 1);
+        items = fileDetails.split(":");
+        //PigMapOnly.java
+        String fileName = null;
+        int lineNumber = 0;
+        if(items.length > 0) {
+            fileName = items[0];
+            lineNumber = Integer.parseInt(items[1]);
+        }
+        return new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
+    }    
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Fri Feb 13 01:59:27 2009
@@ -56,7 +56,7 @@
             String grpName,
             PigContext pc) throws PlanException, VisitorException,
                                   IOException, ExecException,
-                                  JobCreationException {
+                                  JobCreationException, Exception {
         long sleepTime = 500;
         MROperPlan mrp = compile(php, pc);
         
@@ -95,7 +95,7 @@
             log.error("Map reduce job failed");
             for (Job fj : failedJobs) {
                 log.error(fj.getMessage());
-                getStats(fj, jobClient, true);
+                getStats(fj, jobClient, true, pc);
             }
             jc.stop(); 
             return false;
@@ -104,7 +104,7 @@
         List<Job> succJobs = jc.getSuccessfulJobs();
         if(succJobs!=null)
             for(Job job : succJobs){
-                getStats(job,jobClient, false);
+                getStats(job,jobClient, false, pc);
             }
 
         jc.stop(); 
@@ -166,7 +166,7 @@
     //A purely testing method. Not to be used elsewhere
     public boolean launchPigWithCombinePlan(PhysicalPlan php,
             String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
-            VisitorException, IOException, ExecException, JobCreationException {
+            VisitorException, IOException, ExecException, JobCreationException, Exception {
         long sleepTime = 500;
         MRCompiler comp = new MRCompiler(php, pc);
         comp.compile();
@@ -212,13 +212,13 @@
                 throw new ExecException(
                         "Something terribly wrong with Job Control.");
             for (Job job : failedJobs) {
-                getStats(job, jobClient, true);
+                getStats(job, jobClient, true, pc);
             }
         }
         List<Job> succJobs = jc.getSuccessfulJobs();
         if (succJobs != null)
             for (Job job : succJobs) {
-                getStats(job, jobClient, false);
+                getStats(job, jobClient, false, pc);
             }
 
         jc.stop();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 13 01:59:27 2009
@@ -871,9 +871,9 @@
             curMROp.setFragment(op.getFragment());
             curMROp.setReplFiles(op.getReplFiles());
         }catch(Exception e){
-            VisitorException pe = new VisitorException(e.getMessage());
-            pe.initCause(e);
-            throw pe;
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
 
@@ -958,7 +958,7 @@
         }
     }
     
-    private int[] getSortCols(POSort sort) throws PlanException {
+    private int[] getSortCols(POSort sort) throws PlanException, ExecException {
         List<PhysicalPlan> plans = sort.getSortPlans();
         if(plans!=null){
             int[] ret = new int[plans.size()]; 
@@ -1384,29 +1384,35 @@
                         break;
                     }
                     
-                    // if input to project is the last input
-                    if (proj.getColumn() == pack.getNumInps())
-                    {
-                        // if we had already seen another project
-                        // which was also for the last input, then
-                        // we might be trying to flatten twice on the
-                        // last input in which case we can't optimize by
-                        // just streaming the tuple to those projects
-                        // IMPORTANT NOTE: THIS WILL NEED TO CHANGE WHEN WE
-                        // OPTIMIZE BUILTINS LIKE SUM() AND COUNT() TO
-                        // TAKE IN STREAMING INPUT
-                        if(projOfLastInput != null) {
-                            allSimple = false;
-                            break;
-                        }
-                        projOfLastInput = proj;
-                        // make sure the project is on a bag which needs to be
-                        // flattened
-                        if (!flatten.get(i) || proj.getResultType() != DataType.BAG)
+                    try {
+                        // if input to project is the last input
+                        if (proj.getColumn() == pack.getNumInps())
                         {
-                            lastInputFlattened = false;
-                            break;
+                            // if we had already seen another project
+                            // which was also for the last input, then
+                            // we might be trying to flatten twice on the
+                            // last input in which case we can't optimize by
+                            // just streaming the tuple to those projects
+                            // IMPORTANT NOTE: THIS WILL NEED TO CHANGE WHEN WE
+                            // OPTIMIZE BUILTINS LIKE SUM() AND COUNT() TO
+                            // TAKE IN STREAMING INPUT
+                            if(projOfLastInput != null) {
+                                allSimple = false;
+                                break;
+                            }
+                            projOfLastInput = proj;
+                            // make sure the project is on a bag which needs to be
+                            // flattened
+                            if (!flatten.get(i) || proj.getResultType() != DataType.BAG)
+                            {
+                                lastInputFlattened = false;
+                                break;
+                            }
                         }
+                    } catch (ExecException e) {
+                        int errCode = 2069;
+                        String msg = "Error during map reduce compilation. Problem in accessing column from project operator.";
+                        throw new MRCompilerException(msg, errCode, PigException.BUG, e);
                     }
                     
                     // if all deeper operators are all project
@@ -1577,14 +1583,14 @@
                 if (mpLeaves.size() != 1) {
                     int errCode = 2024; 
                     String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
-                    throw new PigException(msg, errCode, PigException.BUG);
+                    throw new MRCompilerException(msg, errCode, PigException.BUG);
                 }
                 PhysicalOperator mpLeaf = mpLeaves.get(0);
                 if (!(mpLeaf instanceof POStore)) {
                     int errCode = 2025;
                     String msg = "Expected leaf of reduce plan to " +
                         "always be POStore. Found " + mpLeaf.getClass().getSimpleName();
-                    throw new PigException(msg, errCode, PigException.BUG);
+                    throw new MRCompilerException(msg, errCode, PigException.BUG);
                 }
                 FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
                 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 13 01:59:27 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
@@ -27,6 +28,7 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -49,6 +51,10 @@
  */
 public class MapReduceLauncher extends Launcher{
     private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
+ 
+    //used to track the exception thrown by the job control which is run in a separate thread
+    private Exception jobControlException = null;
+    
     @Override
     public boolean launchPig(PhysicalPlan php,
                              String grpName,
@@ -56,7 +62,8 @@
                                                    VisitorException,
                                                    IOException,
                                                    ExecException,
-                                                   JobCreationException {
+                                                   JobCreationException,
+                                                   Exception {
         long sleepTime = 5000;
         MROperPlan mrp = compile(php, pc);
         
@@ -71,7 +78,12 @@
         
         int numMRJobs = jc.getWaitingJobs().size();
         
-        new Thread(jc).start();
+        //create the exception handler for the job control thread
+        //and register the handler with the job control thread
+        JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+        Thread jcThread = new Thread(jc);
+        jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+        jcThread.start();
 
         double lastProg = -1;
         int perCom = 0;
@@ -87,13 +99,26 @@
             }
             lastProg = prog;
         }
+        
+        //check for the jobControlException first
+        //if the job controller fails before launching the jobs then there are
+        //no jobs to check for failure
+        if(jobControlException != null) {
+        	if(jobControlException instanceof PigException) {
+        		throw jobControlException;
+        	} else {
+	        	int errCode = 2117;
+	        	String msg = "Unexpected error when launching map reduce job.";        	
+	    		throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+        	}
+        }
+        
         // Look to see if any jobs failed.  If so, we need to report that.
         List<Job> failedJobs = jc.getFailedJobs();
         if (failedJobs != null && failedJobs.size() > 0) {
             log.error("Map reduce job failed");
             for (Job fj : failedJobs) {
-                log.error(fj.getMessage());
-                getStats(fj, jobClient, true);
+                getStats(fj, jobClient, true, pc);
             }
             jc.stop(); 
             return false;
@@ -102,7 +127,7 @@
         List<Job> succJobs = jc.getSuccessfulJobs();
         if(succJobs!=null)
             for(Job job : succJobs){
-                getStats(job,jobClient, false);
+                getStats(job,jobClient, false, pc);
             }
 
         jc.stop(); 
@@ -160,5 +185,27 @@
         kdv.visit();
         return plan;
     }
+    
+    /**
+     * An exception handler class to handle exceptions thrown by the job controller thread
+     * Its a local class. This is the only mechanism to catch unhandled thread exceptions
+     * Unhandled exceptions in threads are handled by the VM if the handler is not registered
+     * explicitly or if the default handler is null
+     */
+    class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
+    	
+    	public void uncaughtException(Thread thread, Throwable throwable) {
+    		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    		PrintStream ps = new PrintStream(baos);
+    		throwable.printStackTrace(ps);
+    		String exceptionString = baos.toString();    		
+    		try {	
+    			jobControlException = getExceptionFromString(exceptionString);
+    		} catch (Exception e) {
+    			String errMsg = "Could not resolve error that occured when launching map reduce job.";
+    			jobControlException = new RuntimeException(errMsg, e);
+    		}
+    	}
+    }
  
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 13 01:59:27 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -109,9 +110,9 @@
                     roots = cp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = cp.getLeaves().get(0);
                 }
-            } catch (IOException e) {
-                log.error(e.getMessage() + "was caused by:");
-                log.error(e.getCause().getMessage());
+            } catch (IOException ioe) {
+                String msg = "Problem while configuring combiner's reduce plan.";
+                throw new RuntimeException(msg, ioe);
             }
         }
         
@@ -195,13 +196,13 @@
                             continue;
                         
                         if(redRes.returnStatus==POStatus.STATUS_ERR){
+                            int errCode = 2090;
                             String msg = "Received Error while " +
                             "processing the combine plan.";
                             if(redRes.result != null) {
                                 msg += redRes.result;
                             }
-                            IOException ioe = new IOException(msg);
-                            throw ioe;
+                            throw new ExecException(msg, errCode, PigException.BUG);
                         }
                     }
                 }
@@ -210,8 +211,9 @@
                     return false;
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
-                    IOException ioe = new IOException("Packaging error while processing group");
-                    throw ioe;
+                    int errCode = 2091;
+                    String msg = "Packaging error while processing group.";
+                    throw new ExecException(msg, errCode, PigException.BUG);
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -221,9 +223,7 @@
                 return false;    
                 
             } catch (ExecException e) {
-                IOException ioe = new IOException(e.getMessage());
-                ioe.initCause(e.getCause());
-                throw ioe;
+                throw e;
             }
 
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 13 01:59:27 2009
@@ -39,9 +39,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.Slice;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.PigSlicer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
@@ -100,7 +102,9 @@
     protected Path[] listPaths(JobConf job) throws IOException {
         Path[] dirs = FileInputFormat.getInputPaths(job);
         if (dirs.length == 0) {
-            throw new IOException("No input paths specified in job");
+            int errCode = 2092;
+            String msg = "No input paths specified in job.";
+            throw new ExecException(msg, errCode, PigException.BUG);
         }
         
         List<Path> result = new ArrayList<Path>();
@@ -176,37 +180,54 @@
      */
     public InputSplit[] getSplits(JobConf job, int numSplits)
             throws IOException {
-        ArrayList<Pair<FileSpec, Boolean>> inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
-                .deserialize(job.get("pig.inputs"));
-        ArrayList<ArrayList<OperatorKey>> inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
-                .deserialize(job.get("pig.inpTargets"));
-        PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job
-                .get("pig.pigContext"));
+        ArrayList<Pair<FileSpec, Boolean>> inputs;
+		ArrayList<ArrayList<OperatorKey>> inpTargets;
+		PigContext pigContext;
+		try {
+			inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
+			        .deserialize(job.get("pig.inputs"));
+			inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
+			        .deserialize(job.get("pig.inpTargets"));
+			pigContext = (PigContext) ObjectSerializer.deserialize(job
+			        .get("pig.pigContext"));
+		} catch (Exception e) {
+			int errCode = 2094;
+			String msg = "Unable to deserialize object.";
+			throw new ExecException(msg, errCode, PigException.BUG, e);
+		}
         
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
         for (int i = 0; i < inputs.size(); i++) {
-            Path path = new Path(inputs.get(i).first.getFileName());
-            FileSystem fs = path.getFileSystem(job);
-            // if the execution is against Mapred DFS, set
-            // working dir to /user/<userid>
-            if(pigContext.getExecType() == ExecType.MAPREDUCE)
-                fs.setWorkingDirectory(new Path("/user", job.getUser()));
-            
-            DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
-            ValidatingInputFileSpec spec;
-            if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
-                spec = (ValidatingInputFileSpec) inputs.get(i).first;
-            } else {
-                spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
-            }
-            boolean isSplittable = inputs.get(i).second;
-            if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
-                ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
-            }
-            Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
-            for (Slice split : pigs) {
-                splits.add(new SliceWrapper(split, pigContext, i, fs, inpTargets.get(i)));
-            }
+            try {
+				Path path = new Path(inputs.get(i).first.getFileName());
+				FileSystem fs = path.getFileSystem(job);
+				// if the execution is against Mapred DFS, set
+				// working dir to /user/<userid>
+				if(pigContext.getExecType() == ExecType.MAPREDUCE)
+				    fs.setWorkingDirectory(new Path("/user", job.getUser()));
+				
+				DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
+				ValidatingInputFileSpec spec;
+				if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
+				    spec = (ValidatingInputFileSpec) inputs.get(i).first;
+				} else {
+				    spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
+				}
+				boolean isSplittable = inputs.get(i).second;
+				if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+				    ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
+				}
+				Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
+				for (Slice split : pigs) {
+				    splits.add(new SliceWrapper(split, pigContext, i, fs, inpTargets.get(i)));
+				}
+            } catch (ExecException ee) {
+            	throw ee;
+			} catch (Exception e) {
+				int errCode = 2118;
+				String msg = "Unable to create input slice for: " + inputs.get(i).first.getFileName();
+				throw new ExecException(msg, errCode, PigException.BUG, e);
+			}
         }
         return splits.toArray(new SliceWrapper[splits.size()]);
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Feb 13 01:59:27 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
@@ -96,9 +97,7 @@
             try {
                 runPipeline(leaf);
             } catch (ExecException e) {
-                 IOException ioe = new IOException("Error running pipeline in close() of map");
-                 ioe.initCause(e);
-                 throw ioe;
+            	throw e;
             }
         }
         
@@ -151,9 +150,9 @@
                 roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
                 leaf = mp.getLeaves().get(0);
             }
-        } catch (IOException e) {
-            log.error(e.getMessage() + "was caused by:");
-            log.error(e.getCause().getMessage());
+        } catch (IOException ioe) {
+            String msg = "Problem while configuring map plan.";
+            throw new RuntimeException(msg, ioe);
         }
     }
     
@@ -183,9 +182,7 @@
             try{
                 collect(oc,inpTuple);
             } catch (ExecException e) {
-                IOException ioe = new IOException(e.getMessage());
-                ioe.initCause(e.getCause());
-                throw ioe;
+                throw e;
             }
             return;
         }
@@ -197,9 +194,7 @@
             runPipeline(leaf);
             
         } catch (ExecException e) {
-            IOException ioe = new IOException(e.getMessage());
-            ioe.initCause(e.getCause());
-            throw ioe;
+        	throw e;
         }
     }
 
@@ -232,8 +227,9 @@
                     "processing the map plan.";
                 }
                     
-                IOException ioe = new IOException(errMsg);
-                throw ioe;
+                int errCode = 2055;
+                ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+                throw ee;
             }
         }
         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Feb 13 01:59:27 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -188,9 +189,9 @@
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = rp.getLeaves().get(0);
                 }
-            } catch (IOException e) {
-                log.error(e.getMessage() + "was caused by:");
-                log.error(e.getCause().getMessage());
+            } catch (IOException ioe) {
+                String msg = "Problem while configuring reduce plan.";
+                throw new RuntimeException(msg, ioe);
             }
         }
         
@@ -258,8 +259,9 @@
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
-                    IOException ioe = new IOException("Packaging error while processing group");
-                    throw ioe;
+                    int errCode = 2093;
+                    String msg = "Encountered error in package operator while processing group.";
+                    throw new ExecException(msg, errCode, PigException.BUG);
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -268,9 +270,7 @@
                     
                 return false;
             } catch (ExecException e) {
-                IOException ioe = new IOException(e.getMessage());
-                ioe.initCause(e.getCause());
-                throw ioe;
+                throw e;
             }
         }
         
@@ -300,17 +300,16 @@
                     // close() we can do the right thing
                     errorInReduce   = true;
                     // if there is an errmessage use it
-                    String errMsg;
+                    String msg;
                     if(redRes.result != null) {
-                        errMsg = "Received Error while " +
+                        msg = "Received Error while " +
                         "processing the reduce plan: " + redRes.result;
                     } else {
-                        errMsg = "Received Error while " +
+                        msg = "Received Error while " +
                         "processing the reduce plan.";
                     }
-                    
-                    IOException ioe = new IOException(errMsg);
-                    throw ioe;
+                    int errCode = 2090;
+                    throw new ExecException(msg, errCode, PigException.BUG);
                 }
             }
 
@@ -344,9 +343,7 @@
                 try {
                     runPipeline(leaf);
                 } catch (ExecException e) {
-                     IOException ioe = new IOException("Error running pipeline in close() of reduce");
-                     ioe.initCause(e);
-                     throw ioe;
+                     throw e;
                 }
             }
             
@@ -410,7 +407,7 @@
                 try {
                     key = HDataType.getWritableComparableTypes(t.get(0), keyType);
                 } catch (ExecException e) {
-                    throw WrappedIOException.wrap(e);
+                    throw e;
                 }
             }
             
@@ -440,15 +437,14 @@
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
-                    IOException ioe = new IOException("Packaging error while processing group");
-                    throw ioe;
+                    int errCode = 2093;
+                    String msg = "Encountered error in package operator while processing group.";
+                    throw new ExecException(msg, errCode, PigException.BUG);
                 }
                     
                 
             } catch (ExecException e) {
-                IOException ioe = new IOException(e.getMessage());
-                ioe.initCause(e.getCause());
-                throw ioe;
+                throw e;
             }
         }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Feb 13 01:59:27 2009
@@ -30,7 +30,9 @@
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
+import org.apache.pig.PigException;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -63,11 +65,9 @@
                 store = (StoreFunc) PigContext
                         .instantiateFuncFromSpec(storeFunc);
             } catch (Exception e) {
-                RuntimeException re = new RuntimeException(e.getClass()
-                        .getName()
-                        + ": " + e.getMessage());
-                re.setStackTrace(e.getStackTrace());
-                throw re;
+                int errCode = 2081;
+                String msg = "Unable to setup the store function.";
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri Feb 13 01:59:27 2009
@@ -54,9 +54,8 @@
             mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
-            mLog.error("Unable to deserialize pig.sortOrder " +
-                ioe.getMessage());
-            throw new RuntimeException(ioe);
+            String msg = "Unable to deserialize pig.sortOrder";
+            throw new RuntimeException(msg, ioe);
         }
         if (mAsc == null) {
             mAsc = new boolean[1];

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=743952&r1=743951&r2=743952&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Fri Feb 13 01:59:27 2009
@@ -40,8 +40,10 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
 import org.apache.pig.Slice;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.PigSlice;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
@@ -183,8 +185,9 @@
         try {
             return ois.readObject();
         } catch (ClassNotFoundException cnfe) {
-            IOException newE = wrapException(cnfe);
-            throw newE;
+            int errCode = 2094;
+            String msg = "Unable to deserialize object.";
+            throw new ExecException(msg, errCode, PigException.BUG, cnfe);
         }
     }