You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/10/30 22:56:17 UTC

svn commit: r1403889 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physicalLay...

Author: julien
Date: Tue Oct 30 21:56:16 2012
New Revision: 1403889

URL: http://svn.apache.org/viewvc?rev=1403889&view=rev
Log:
PIG-3004: Improve exceptions messages when a RuntimeException is raised in Physical Operators (julien)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct 30 21:56:16 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-3004: Improve exceptions messages when a RuntimeException is raised in Physical Operators (julien)
+
 PIG-2990: the -secretDebugCmd shouldn't be a secret and should just be...a command (jcoveney)
 
 PIG-2941: Ivy resolvers in pig don't have consistent chaining and don't have a kitchen sink option for novices (jgordon via azaroth)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Tue Oct 30 21:56:16 2012
@@ -329,31 +329,35 @@ public abstract class PhysicalOperator e
      */
     @SuppressWarnings("rawtypes")  // For legacy use of untemplatized Map.
     public Result getNext(Object obj, byte dataType) throws ExecException {
-        switch (dataType) {
-        case DataType.BAG:
-            return getNext((DataBag) obj);
-        case DataType.BOOLEAN:
-            return getNext((Boolean) obj);
-        case DataType.BYTEARRAY:
-            return getNext((DataByteArray) obj);
-        case DataType.CHARARRAY:
-            return getNext((String) obj);
-        case DataType.DOUBLE:
-            return getNext((Double) obj);
-        case DataType.FLOAT:
-            return getNext((Float) obj);
-        case DataType.INTEGER:
-            return getNext((Integer) obj);
-        case DataType.LONG:
-            return getNext((Long) obj);
-        case DataType.DATETIME:
-            return getNext((DateTime) obj);
-        case DataType.MAP:
-            return getNext((Map) obj);
-        case DataType.TUPLE:
-            return getNext((Tuple) obj);
-        default:
-            throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
+        try {
+            switch (dataType) {
+            case DataType.BAG:
+                return getNext((DataBag) obj);
+            case DataType.BOOLEAN:
+                return getNext((Boolean) obj);
+            case DataType.BYTEARRAY:
+                return getNext((DataByteArray) obj);
+            case DataType.CHARARRAY:
+                return getNext((String) obj);
+            case DataType.DOUBLE:
+                return getNext((Double) obj);
+            case DataType.FLOAT:
+                return getNext((Float) obj);
+            case DataType.INTEGER:
+                return getNext((Integer) obj);
+            case DataType.LONG:
+                return getNext((Long) obj);
+            case DataType.DATETIME:
+                return getNext((DateTime) obj);
+            case DataType.MAP:
+                return getNext((Map) obj);
+            case DataType.TUPLE:
+                return getNext((Tuple) obj);
+            default:
+                throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
+            }
+        } catch (RuntimeException e) {
+            throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
         }
     }
 
@@ -401,7 +405,7 @@ public abstract class PhysicalOperator e
     public Result getNext(Float f) throws ExecException {
         return res;
     }
-    
+
     public Result getNext(DateTime dt) throws ExecException {
         return res;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Tue Oct 30 21:56:16 2012
@@ -61,6 +61,7 @@ public class EqualToExpr extends BinaryC
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
+      try {
         Result left, right;
 
         switch (operandType) {
@@ -92,6 +93,9 @@ public class EqualToExpr extends BinaryC
         }
 
         }
+      } catch (RuntimeException e) {
+          throw new ExecException("exception while executing " + this.toString() + ": " + e.toString(), 2067, PigException.BUG, e);
+      }
     }
 
     @SuppressWarnings("unchecked")

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Tue Oct 30 21:56:16 2012
@@ -60,7 +60,7 @@ public abstract class ExpressionOperator
     public void setIllustrator(Illustrator illustrator) {
         this.illustrator = illustrator;
     }
-    
+
     @Override
     public boolean supportsMultipleOutputs() {
         return false;
@@ -225,4 +225,9 @@ public abstract class ExpressionOperator
     protected Result accumChild(List<ExpressionOperator> child, DataBag db) throws ExecException {
         return accumChild(child, db, DataType.BAG);
     }
+
+    @Override
+    public String toString() {
+        return "[" + this.getClass().getSimpleName() + " " + super.toString() + " children: " + getChildExpressions() + " at " + getOriginalLocations() + "]";
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue Oct 30 21:56:16 2012
@@ -206,100 +206,104 @@ public class POForEach extends PhysicalO
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        Result res = null;
-        Result inp = null;
-        //The nested plan is under processing
-        //So return tuples that the generate oper
-        //returns
-        if(processingPlan){
-            while(true) {
-                res = processPlan();
-
-                if(res.returnStatus==POStatus.STATUS_OK) {
-                    return res;
-                }
-                if(res.returnStatus==POStatus.STATUS_EOP) {
-                    processingPlan = false;
-                    for(PhysicalPlan plan : inputPlans) {
-                        plan.detachInput();
+        try {
+            Result res = null;
+            Result inp = null;
+            //The nested plan is under processing
+            //So return tuples that the generate oper
+            //returns
+            if(processingPlan){
+                while(true) {
+                    res = processPlan();
+
+                    if(res.returnStatus==POStatus.STATUS_OK) {
+                        return res;
+                    }
+                    if(res.returnStatus==POStatus.STATUS_EOP) {
+                        processingPlan = false;
+                        for(PhysicalPlan plan : inputPlans) {
+                            plan.detachInput();
+                        }
+                        break;
+                    }
+                    if(res.returnStatus==POStatus.STATUS_ERR) {
+                        return res;
+                    }
+                    if(res.returnStatus==POStatus.STATUS_NULL) {
+                        continue;
                     }
-                    break;
                 }
-                if(res.returnStatus==POStatus.STATUS_ERR) {
-                    return res;
+            }
+            //The nested plan processing is done or is
+            //yet to begin. So process the input and start
+            //nested plan processing on the input tuple
+            //read
+            while (true) {
+                inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_EOP ||
+                        inp.returnStatus == POStatus.STATUS_ERR) {
+                    return inp;
                 }
-                if(res.returnStatus==POStatus.STATUS_NULL) {
+                if (inp.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 }
-            }
-        }
-        //The nested plan processing is done or is
-        //yet to begin. So process the input and start
-        //nested plan processing on the input tuple
-        //read
-        while (true) {
-            inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP ||
-                    inp.returnStatus == POStatus.STATUS_ERR) {
-                return inp;
-            }
-            if (inp.returnStatus == POStatus.STATUS_NULL) {
-                continue;
-            }
-
-            attachInputToPlans((Tuple) inp.result);
-            inpTuple = (Tuple)inp.result;
-
-            for (PhysicalOperator po : opsToBeReset) {
-                po.reset();
-            }
-
-            if (isAccumulative()) {
-                for(int i=0; i<inpTuple.size(); i++) {
-                    if (inpTuple.getType(i) == DataType.BAG) {
-                        // we only need to check one bag, because all the bags
-                        // share the same buffer
-                        buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
-                        break;
-                    }
+
+                attachInputToPlans((Tuple) inp.result);
+                inpTuple = (Tuple)inp.result;
+
+                for (PhysicalOperator po : opsToBeReset) {
+                    po.reset();
                 }
 
-                setAccumStart();
-                while(true) {
-                    if (!isEarlyTerminated() && buffer.hasNextBatch()) {
-                        try {
-                            buffer.nextBatch();
-                        }catch(IOException e) {
-                            throw new ExecException(e);
+                if (isAccumulative()) {
+                    for(int i=0; i<inpTuple.size(); i++) {
+                        if (inpTuple.getType(i) == DataType.BAG) {
+                            // we only need to check one bag, because all the bags
+                            // share the same buffer
+                            buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
+                            break;
                         }
-                    }else{
-                        inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
- //                       buffer.clear();
-                        setAccumEnd();
                     }
 
-                    res = processPlan();
+                    setAccumStart();
+                    while(true) {
+                        if (!isEarlyTerminated() && buffer.hasNextBatch()) {
+                            try {
+                                buffer.nextBatch();
+                            }catch(IOException e) {
+                                throw new ExecException(e);
+                            }
+                        }else{
+                            inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+                            //                       buffer.clear();
+                            setAccumEnd();
+                        }
 
-                    if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
-                        // attach same input again to process next batch
-                        attachInputToPlans((Tuple) inp.result);
-                    } else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
-                        //if this bubbled up, then we just need to pass a null value through the pipe
-                        //so that POUserFunc will properly return the values
-                        attachInputToPlans(null);
-                        earlyTerminate();
-                    } else {
-                        break;
+                        res = processPlan();
+
+                        if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+                            // attach same input again to process next batch
+                            attachInputToPlans((Tuple) inp.result);
+                        } else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
+                            //if this bubbled up, then we just need to pass a null value through the pipe
+                            //so that POUserFunc will properly return the values
+                            attachInputToPlans(null);
+                            earlyTerminate();
+                        } else {
+                            break;
+                        }
                     }
-                }
 
-            } else {
-                res = processPlan();
-            }
+                } else {
+                    res = processPlan();
+                }
 
-            processingPlan = true;
+                processingPlan = true;
 
-            return res;
+                return res;
+            }
+        } catch (RuntimeException e) {
+            throw new ExecException("Error while executing ForEach at " + this.getOriginalLocations(), e);
         }
     }