You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/10/30 22:56:17 UTC
svn commit: r1403889 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/physicalLay...
Author: julien
Date: Tue Oct 30 21:56:16 2012
New Revision: 1403889
URL: http://svn.apache.org/viewvc?rev=1403889&view=rev
Log:
PIG-3004: Improve exceptions messages when a RuntimeException is raised in Physical Operators (julien)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct 30 21:56:16 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3004: Improve exceptions messages when a RuntimeException is raised in Physical Operators (julien)
+
PIG-2990: the -secretDebugCmd shouldn't be a secret and should just be...a command (jcoveney)
PIG-2941: Ivy resolvers in pig don't have consistent chaining and don't have a kitchen sink option for novices (jgordon via azaroth)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Tue Oct 30 21:56:16 2012
@@ -329,31 +329,35 @@ public abstract class PhysicalOperator e
*/
@SuppressWarnings("rawtypes") // For legacy use of untemplatized Map.
public Result getNext(Object obj, byte dataType) throws ExecException {
- switch (dataType) {
- case DataType.BAG:
- return getNext((DataBag) obj);
- case DataType.BOOLEAN:
- return getNext((Boolean) obj);
- case DataType.BYTEARRAY:
- return getNext((DataByteArray) obj);
- case DataType.CHARARRAY:
- return getNext((String) obj);
- case DataType.DOUBLE:
- return getNext((Double) obj);
- case DataType.FLOAT:
- return getNext((Float) obj);
- case DataType.INTEGER:
- return getNext((Integer) obj);
- case DataType.LONG:
- return getNext((Long) obj);
- case DataType.DATETIME:
- return getNext((DateTime) obj);
- case DataType.MAP:
- return getNext((Map) obj);
- case DataType.TUPLE:
- return getNext((Tuple) obj);
- default:
- throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
+ try {
+ switch (dataType) {
+ case DataType.BAG:
+ return getNext((DataBag) obj);
+ case DataType.BOOLEAN:
+ return getNext((Boolean) obj);
+ case DataType.BYTEARRAY:
+ return getNext((DataByteArray) obj);
+ case DataType.CHARARRAY:
+ return getNext((String) obj);
+ case DataType.DOUBLE:
+ return getNext((Double) obj);
+ case DataType.FLOAT:
+ return getNext((Float) obj);
+ case DataType.INTEGER:
+ return getNext((Integer) obj);
+ case DataType.LONG:
+ return getNext((Long) obj);
+ case DataType.DATETIME:
+ return getNext((DateTime) obj);
+ case DataType.MAP:
+ return getNext((Map) obj);
+ case DataType.TUPLE:
+ return getNext((Tuple) obj);
+ default:
+ throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
+ }
+ } catch (RuntimeException e) {
+ throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
}
}
@@ -401,7 +405,7 @@ public abstract class PhysicalOperator e
public Result getNext(Float f) throws ExecException {
return res;
}
-
+
public Result getNext(DateTime dt) throws ExecException {
return res;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Tue Oct 30 21:56:16 2012
@@ -61,6 +61,7 @@ public class EqualToExpr extends BinaryC
@Override
public Result getNext(Boolean bool) throws ExecException {
+ try {
Result left, right;
switch (operandType) {
@@ -92,6 +93,9 @@ public class EqualToExpr extends BinaryC
}
}
+ } catch (RuntimeException e) {
+ throw new ExecException("exception while executing " + this.toString() + ": " + e.toString(), 2067, PigException.BUG, e);
+ }
}
@SuppressWarnings("unchecked")
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Tue Oct 30 21:56:16 2012
@@ -60,7 +60,7 @@ public abstract class ExpressionOperator
public void setIllustrator(Illustrator illustrator) {
this.illustrator = illustrator;
}
-
+
@Override
public boolean supportsMultipleOutputs() {
return false;
@@ -225,4 +225,9 @@ public abstract class ExpressionOperator
protected Result accumChild(List<ExpressionOperator> child, DataBag db) throws ExecException {
return accumChild(child, db, DataType.BAG);
}
+
+ @Override
+ public String toString() {
+ return "[" + this.getClass().getSimpleName() + " " + super.toString() + " children: " + getChildExpressions() + " at " + getOriginalLocations() + "]";
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1403889&r1=1403888&r2=1403889&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue Oct 30 21:56:16 2012
@@ -206,100 +206,104 @@ public class POForEach extends PhysicalO
*/
@Override
public Result getNext(Tuple t) throws ExecException {
- Result res = null;
- Result inp = null;
- //The nested plan is under processing
- //So return tuples that the generate oper
- //returns
- if(processingPlan){
- while(true) {
- res = processPlan();
-
- if(res.returnStatus==POStatus.STATUS_OK) {
- return res;
- }
- if(res.returnStatus==POStatus.STATUS_EOP) {
- processingPlan = false;
- for(PhysicalPlan plan : inputPlans) {
- plan.detachInput();
+ try {
+ Result res = null;
+ Result inp = null;
+ //The nested plan is under processing
+ //So return tuples that the generate oper
+ //returns
+ if(processingPlan){
+ while(true) {
+ res = processPlan();
+
+ if(res.returnStatus==POStatus.STATUS_OK) {
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ processingPlan = false;
+ for(PhysicalPlan plan : inputPlans) {
+ plan.detachInput();
+ }
+ break;
+ }
+ if(res.returnStatus==POStatus.STATUS_ERR) {
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ continue;
}
- break;
}
- if(res.returnStatus==POStatus.STATUS_ERR) {
- return res;
+ }
+ //The nested plan processing is done or is
+ //yet to begin. So process the input and start
+ //nested plan processing on the input tuple
+ //read
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP ||
+ inp.returnStatus == POStatus.STATUS_ERR) {
+ return inp;
}
- if(res.returnStatus==POStatus.STATUS_NULL) {
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
- }
- }
- //The nested plan processing is done or is
- //yet to begin. So process the input and start
- //nested plan processing on the input tuple
- //read
- while (true) {
- inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP ||
- inp.returnStatus == POStatus.STATUS_ERR) {
- return inp;
- }
- if (inp.returnStatus == POStatus.STATUS_NULL) {
- continue;
- }
-
- attachInputToPlans((Tuple) inp.result);
- inpTuple = (Tuple)inp.result;
-
- for (PhysicalOperator po : opsToBeReset) {
- po.reset();
- }
-
- if (isAccumulative()) {
- for(int i=0; i<inpTuple.size(); i++) {
- if (inpTuple.getType(i) == DataType.BAG) {
- // we only need to check one bag, because all the bags
- // share the same buffer
- buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
- break;
- }
+
+ attachInputToPlans((Tuple) inp.result);
+ inpTuple = (Tuple)inp.result;
+
+ for (PhysicalOperator po : opsToBeReset) {
+ po.reset();
}
- setAccumStart();
- while(true) {
- if (!isEarlyTerminated() && buffer.hasNextBatch()) {
- try {
- buffer.nextBatch();
- }catch(IOException e) {
- throw new ExecException(e);
+ if (isAccumulative()) {
+ for(int i=0; i<inpTuple.size(); i++) {
+ if (inpTuple.getType(i) == DataType.BAG) {
+ // we only need to check one bag, because all the bags
+ // share the same buffer
+ buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
+ break;
}
- }else{
- inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
- // buffer.clear();
- setAccumEnd();
}
- res = processPlan();
+ setAccumStart();
+ while(true) {
+ if (!isEarlyTerminated() && buffer.hasNextBatch()) {
+ try {
+ buffer.nextBatch();
+ }catch(IOException e) {
+ throw new ExecException(e);
+ }
+ }else{
+ inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+ // buffer.clear();
+ setAccumEnd();
+ }
- if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
- // attach same input again to process next batch
- attachInputToPlans((Tuple) inp.result);
- } else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
- //if this bubbled up, then we just need to pass a null value through the pipe
- //so that POUserFunc will properly return the values
- attachInputToPlans(null);
- earlyTerminate();
- } else {
- break;
+ res = processPlan();
+
+ if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+ // attach same input again to process next batch
+ attachInputToPlans((Tuple) inp.result);
+ } else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
+ //if this bubbled up, then we just need to pass a null value through the pipe
+ //so that POUserFunc will properly return the values
+ attachInputToPlans(null);
+ earlyTerminate();
+ } else {
+ break;
+ }
}
- }
- } else {
- res = processPlan();
- }
+ } else {
+ res = processPlan();
+ }
- processingPlan = true;
+ processingPlan = true;
- return res;
+ return res;
+ }
+ } catch (RuntimeException e) {
+ throw new ExecException("Error while executing ForEach at " + this.getOriginalLocations(), e);
}
}