You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/15 21:13:42 UTC
svn commit: r656795 [2/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/schema/
src/org/apache/pig/impl/logicalLayer/validators/
src/org/apache/pig/impl/plan/ test/org/apache/pig/test/
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu May 15 12:13:41 2008
@@ -2,10 +2,8 @@
import java.util.Iterator;
import java.util.List ;
-import java.util.ArrayList;
-import java.io.IOException;
+import java.util.ArrayList;
-import org.apache.hadoop.mapred.lib.FieldSelectionMapReduce;
import org.apache.pig.impl.logicalLayer.LOConst;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -14,12 +12,9 @@
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.PlanWalker;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.*;
import org.apache.pig.data.DataType ;
import org.apache.commons.logging.Log;
@@ -42,10 +37,11 @@
super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
msgCollector = messageCollector ;
}
-
+
+ // Just in case caller is lazy
@Override
protected void visit(ExpressionOperator eOp)
- throws VisitorException {
+ throws VisitorException {
if (eOp instanceof BinaryExpressionOperator) {
visit((BinaryExpressionOperator) eOp) ;
}
@@ -61,8 +57,67 @@
else if (eOp instanceof LOCast) {
visit((LOCast) eOp) ;
}
- // TODO: A lot more "else" here for all LOs
+ else if (eOp instanceof LORegexp) {
+ visit((LORegexp) eOp) ;
+ }
+ else if (eOp instanceof LOUserFunc) {
+ visit((LOUserFunc) eOp) ;
+ }
+ else if (eOp instanceof LOProject) {
+ visit((LOProject) eOp) ;
+ }
+ else if (eOp instanceof LONegative) {
+ visit((LONegative) eOp) ;
+ }
+ else if (eOp instanceof LONot) {
+ visit((LONot) eOp) ;
+ }
+ // TODO: Check that all operators are included here
+ }
+
+
+ // Just in case caller is lazy
+ @Override
+ protected void visit(LogicalOperator lOp)
+ throws VisitorException {
+ if (lOp instanceof LOLoad) {
+ visit((LOLoad) lOp) ;
+ }
+ else if (lOp instanceof LODistinct) {
+ visit((LODistinct) lOp) ;
+ }
+ else if (lOp instanceof LOFilter) {
+ visit((LOFilter) lOp) ;
+ }
+ else if (lOp instanceof LOUnion) {
+ visit((LOUnion) lOp) ;
+ }
+ else if (lOp instanceof LOSplit) {
+ visit((LOSplit) lOp) ;
+ }
+ else if (lOp instanceof LOSplitOutput) {
+ visit((LOSplitOutput) lOp) ;
+ }
+ else if (lOp instanceof LOCogroup) {
+ visit((LOCogroup) lOp) ;
+ }
+ else if (lOp instanceof LOSort) {
+ visit((LOSort) lOp) ;
+ }
+ // TODO: Check that all operators are included here
+ }
+
+
+ protected void visit(LOProject pj) throws VisitorException {
+ try {
+ pj.getFieldSchema() ;
+ }
+ catch (FrontendException fe) {
+ VisitorException vse = new VisitorException("Problem in LOProject") ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
}
/**
@@ -70,10 +125,54 @@
* in the parsing stage so we don't need any logic here
*/
@Override
- protected void visit(LOConst cs) {
-
+ protected void visit(LOConst cs)
+ throws VisitorException {
+
}
-
+
+ /**
+ * LORegexp expects CharArray as input
+ * Itself always returns Boolean
+ * @param rg
+ */
+ @Override
+ protected void visit(LORegexp rg)
+ throws VisitorException {
+
+ // We allow BYTEARRAY to be converted to CHARARRAY
+ if (rg.getOperand().getType() == DataType.BYTEARRAY)
+ {
+ insertCastForRegexp(rg) ;
+ }
+
+ // Other than that if it's not CharArray just say goodbye
+ if (rg.getOperand().getType() != DataType.CHARARRAY)
+ {
+ String msg = "Operand of Regex can be CharArray only" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
+ }
+ }
+
+ private void insertCastForRegexp(LORegexp rg) {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+ collectCastWarning(rg, DataType.BYTEARRAY, DataType.CHARARRAY) ;
+ OperatorKey newKey = genNewOperatorKey(rg) ;
+ LOCast cast = new LOCast(currentPlan, newKey, rg.getOperand(), DataType.CHARARRAY) ;
+ currentPlan.add(cast) ;
+ currentPlan.disconnect(rg.getOperand(), rg) ;
+ try {
+ currentPlan.connect(rg.getOperand(), cast) ;
+ currentPlan.connect(cast, rg) ;
+ }
+ catch (PlanException ioe) {
+ AssertionError err = new AssertionError("Explicit casting insertion") ;
+ err.initCause(ioe) ;
+ throw err ;
+ }
+ rg.setOperand(cast) ;
+ }
+
/**
* For all binary operators
*/
@@ -97,7 +196,9 @@
if ( (lhsType != DataType.BOOLEAN) ||
(rhsType != DataType.BOOLEAN) ) {
- throw new VisitorException("Operands of AND/OR can be boolean only") ;
+ String msg = "Operands of AND/OR can be boolean only" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
binOp.setType(DataType.BOOLEAN) ;
@@ -112,44 +213,46 @@
else if ( (binOp instanceof LOMultiply) ||
(binOp instanceof LODivide) ) {
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- // return the bigger type
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+ if ( DataType.isNumberType(lhsType) &&
+ DataType.isNumberType(rhsType) ) {
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- binOp.setType(biggerType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(rhsType)) ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- // Set output type
- binOp.setType(rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(lhsType)) ) {
- insertRightCastForBinaryOp(binOp, lhsType) ;
- // Set output type
- binOp.setType(lhsType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // Cast both operands to double
- insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
- insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
- // Set output type
- binOp.setType(DataType.DOUBLE) ;
- }
- else {
- throw new VisitorException("Cannot evaluate output type of Mul/Div Operator") ;
- }
+ // return the bigger type
+ byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+ // Cast smaller type to the bigger type
+ if (lhsType != biggerType) {
+ insertLeftCastForBinaryOp(binOp, biggerType) ;
+ }
+ else if (rhsType != biggerType) {
+ insertRightCastForBinaryOp(binOp, biggerType) ;
+ }
+ binOp.setType(biggerType) ;
+ }
+ else if ( (lhsType == DataType.BYTEARRAY) &&
+ (DataType.isNumberType(rhsType)) ) {
+ insertLeftCastForBinaryOp(binOp, rhsType) ;
+ // Set output type
+ binOp.setType(rhsType) ;
+ }
+ else if ( (rhsType == DataType.BYTEARRAY) &&
+ (DataType.isNumberType(lhsType)) ) {
+ insertRightCastForBinaryOp(binOp, lhsType) ;
+ // Set output type
+ binOp.setType(lhsType) ;
+ }
+ else if ( (lhsType == DataType.BYTEARRAY) &&
+ (rhsType == DataType.BYTEARRAY) ) {
+ // Cast both operands to double
+ insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+ insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+ // Set output type
+ binOp.setType(DataType.DOUBLE) ;
+ }
+ else {
+ String msg = "Cannot evaluate output type of Mul/Div Operator" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
+ }
}
/**
@@ -197,7 +300,9 @@
binOp.setType(DataType.DOUBLE) ;
}
else {
- throw new VisitorException("Cannot evaluate output type of Add/Subtract Operator") ;
+ String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
}
@@ -248,7 +353,10 @@
insertRightCastForBinaryOp(binOp, lhsType) ;
}
else {
- throw new VisitorException("Cannot evaluate output type of Inequality Operator") ;
+ throw new VisitorException("Cannot evaluate output type of "
+ + binOp.getClass().getSimpleName()
+ + " LHS:" + DataType.findTypeName(lhsType)
+ + " RHS:" + DataType.findTypeName(rhsType)) ;
}
binOp.setType(DataType.BOOLEAN) ;
@@ -310,7 +418,9 @@
// good
}
else {
- throw new VisitorException("Cannot evaluate output type of Equal/NotEqual Operator") ;
+ String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
binOp.setType(DataType.BOOLEAN) ;
@@ -340,7 +450,9 @@
binOp.setType(rhsType) ;
}
else {
- throw new VisitorException("Cannot evaluate output type of Mod Operator") ;
+ String msg = "Cannot evaluate output type of Mod Operator" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
}
@@ -348,13 +460,17 @@
private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
byte toType ) {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+ collectCastWarning(binOp,
+ binOp.getLhsOperand().getType(),
+ toType) ;
OperatorKey newKey = genNewOperatorKey(binOp) ;
- LOCast cast = new LOCast(mPlan, newKey, binOp.getLhsOperand(), toType) ;
- mPlan.add(cast) ;
- mPlan.disconnect(binOp.getLhsOperand(), binOp) ;
+ LOCast cast = new LOCast(currentPlan, newKey, binOp.getLhsOperand(), toType) ;
+ currentPlan.add(cast) ;
+ currentPlan.disconnect(binOp.getLhsOperand(), binOp) ;
try {
- mPlan.connect(binOp.getLhsOperand(), cast) ;
- mPlan.connect(cast, binOp) ;
+ currentPlan.connect(binOp.getLhsOperand(), cast) ;
+ currentPlan.connect(cast, binOp) ;
}
catch (PlanException ioe) {
AssertionError err = new AssertionError("Explicit casting insertion") ;
@@ -366,20 +482,24 @@
private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
byte toType ) {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+ collectCastWarning(binOp,
+ binOp.getRhsOperand().getType(),
+ toType) ;
OperatorKey newKey = genNewOperatorKey(binOp) ;
- LOCast cast = new LOCast(mPlan, newKey, binOp.getRhsOperand(), toType) ;
- mPlan.add(cast) ;
- mPlan.disconnect(binOp.getRhsOperand(), binOp) ;
+ LOCast cast = new LOCast(currentPlan, newKey, binOp.getRhsOperand(), toType) ;
+ currentPlan.add(cast) ;
+ currentPlan.disconnect(binOp.getRhsOperand(), binOp) ;
try {
- mPlan.connect(binOp.getRhsOperand(), cast) ;
- mPlan.connect(cast, binOp) ;
+ currentPlan.connect(binOp.getRhsOperand(), cast) ;
+ currentPlan.connect(cast, binOp) ;
}
catch (PlanException ioe) {
AssertionError err = new AssertionError("Explicit casting insertion") ;
err.initCause(ioe) ;
throw err ;
}
- binOp.setRhsOperand(cast) ;
+ binOp.setRhsOperand(cast) ;
}
/**
@@ -399,7 +519,9 @@
uniOp.setType(DataType.DOUBLE) ;
}
else {
- throw new VisitorException("NEG can be used with numbers or Bytearray only") ;
+ String msg = "NEG can be used with numbers or Bytearray only" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
}
else if (uniOp instanceof LONot) {
@@ -407,7 +529,9 @@
uniOp.setType(DataType.BOOLEAN) ;
}
else {
- throw new VisitorException("NOT can be used with boolean only") ;
+ String msg = "NOT can be used with boolean only" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
}
else {
@@ -418,35 +542,55 @@
}
private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
- List<LogicalOperator> list = mPlan.getPredecessors(uniOp) ;
+ collectCastWarning(uniOp,
+ uniOp.getOperand().getType(),
+ toType) ;
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+ List<LogicalOperator> list = currentPlan.getPredecessors(uniOp) ;
if (list==null) {
throw new AssertionError("No input for " + uniOp.getClass()) ;
}
// All uniOps at the moment only work with Expression input
ExpressionOperator input = (ExpressionOperator) list.get(0) ;
OperatorKey newKey = genNewOperatorKey(uniOp) ;
- LOCast cast = new LOCast(mPlan, newKey, input, toType) ;
+ LOCast cast = new LOCast(currentPlan, newKey, input, toType) ;
- mPlan.disconnect(input, uniOp) ;
+ currentPlan.disconnect(input, uniOp) ;
try {
- mPlan.connect(input, cast) ;
- mPlan.connect(cast, uniOp) ;
+ currentPlan.connect(input, cast) ;
+ currentPlan.connect(cast, uniOp) ;
}
catch (PlanException ioe) {
AssertionError err = new AssertionError("Explicit casting insertion") ;
err.initCause(ioe) ;
throw err ;
- }
+ }
+
}
- // TODO: NOT DONE YET because LOUserFunc itself is not done!!!
+ // Currently there is no input type information support in UserFunc
+ // So we can just check if all inputs are not of any stupid type
@Override
protected void visit(LOUserFunc func) throws VisitorException {
- // Visit each of the arguments
- Iterator<ExpressionOperator> i = func.getArguments().iterator();
- while (i.hasNext()) {
- i.next().visit(this);
+
+ List<ExpressionOperator> list = func.getArguments() ;
+
+ // If the dependency graph is right, all the inputs
+ // must already know the types
+
+ for(ExpressionOperator op: list) {
+ if (!DataType.isUsableType(op.getType())) {
+ String msg = "Problem with input of User-defined function" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
+ }
}
+
+ /*
+ while (iterator.hasNext()) {
+ iterator.next().visit(this);
+ }
+ */
}
/**
@@ -458,7 +602,9 @@
// high-level type checking
if (binCond.getCond().getType() != DataType.BOOLEAN) {
- throw new VisitorException("Condition in BinCond must be boolean") ;
+ String msg = "Condition in BinCond must be boolean" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
byte lhsType = binCond.getLhsOp().getType() ;
@@ -476,14 +622,18 @@
binCond.setType(biggerType) ;
}
else if (lhsType != rhsType) {
- throw new VisitorException("Two inputs of BinCond do not have compatible types") ;
+ String msg = "Two inputs of BinCond do not have compatible types" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
// Matching schemas if we're working with tuples
else if (lhsType == DataType.TUPLE) {
try {
if (!binCond.getLhsOp().getSchema().equals(binCond.getRhsOp().getSchema())) {
- throw new VisitorException("Two inputs of BinCond must have compatible schemas") ;
+ String msg = "Two inputs of BinCond must have compatible schemas" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
// TODO: We may have to merge the schema here
// if the previous check is not exact match
@@ -492,13 +642,17 @@
binCond.setSchema(binCond.getLhsOp().getSchema()) ;
}
catch (ParseException pe) {
- VisitorException vse = new VisitorException("Problem during setting BinCond output schema") ;
+ String msg = "Problem during setting BinCond output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(pe) ;
throw vse ;
}
}
catch (FrontendException ioe) {
- VisitorException vse = new VisitorException("Problem during evaluating BinCond output type") ;
+ String msg = "Problem during evaluating BinCond output type" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(ioe) ;
throw vse ;
}
@@ -511,19 +665,27 @@
binCond.setType(DataType.CHARARRAY) ;
}
else {
- throw new VisitorException("Unsupported input type for BinCond") ;
+ String msg = "Unsupported input type for BinCond" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
}
private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+
+ collectCastWarning(binCond,
+ binCond.getLhsOp().getType(),
+ toType) ;
+
OperatorKey newKey = genNewOperatorKey(binCond) ;
- LOCast cast = new LOCast(mPlan, newKey, binCond.getLhsOp(), toType) ;
- mPlan.add(cast) ;
- mPlan.disconnect(binCond.getLhsOp(), binCond) ;
+ LOCast cast = new LOCast(currentPlan, newKey, binCond.getLhsOp(), toType) ;
+ currentPlan.add(cast) ;
+ currentPlan.disconnect(binCond.getLhsOp(), binCond) ;
try {
- mPlan.connect(binCond.getLhsOp(), cast) ;
- mPlan.connect(cast, binCond) ;
+ currentPlan.connect(binCond.getLhsOp(), cast) ;
+ currentPlan.connect(cast, binCond) ;
}
catch (PlanException ioe) {
AssertionError err = new AssertionError("Explicit casting insertion") ;
@@ -531,23 +693,31 @@
throw err ;
}
binCond.setLhsOp(cast) ;
+
}
private void insertRightCastForBinCond(LOBinCond binCond, byte toType) {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+
+ collectCastWarning(binCond,
+ binCond.getRhsOp().getType(),
+ toType) ;
+
OperatorKey newKey = genNewOperatorKey(binCond) ;
- LOCast cast = new LOCast(mPlan, newKey, binCond.getRhsOp(), toType) ;
- mPlan.add(cast) ;
- mPlan.disconnect(binCond.getRhsOp(), binCond) ;
+ LOCast cast = new LOCast(currentPlan, newKey, binCond.getRhsOp(), toType) ;
+ currentPlan.add(cast) ;
+ currentPlan.disconnect(binCond.getRhsOp(), binCond) ;
try {
- mPlan.connect(binCond.getRhsOp(), cast) ;
- mPlan.connect(cast, binCond) ;
+ currentPlan.connect(binCond.getRhsOp(), cast) ;
+ currentPlan.connect(cast, binCond) ;
}
catch (PlanException ioe) {
AssertionError err = new AssertionError("Explicit casting insertion") ;
err.initCause(ioe) ;
throw err ;
}
- binCond.setRhsOp(cast) ;
+ binCond.setRhsOp(cast) ;
+
}
/**
@@ -587,11 +757,12 @@
// good
}
else {
- throw new VisitorException("Cannot cast "
- + DataType.findTypeName(inputType)
- + " to "
- + DataType.findTypeName(expectedType)
- ) ;
+ String msg = "Cannot cast "
+ + DataType.findTypeName(inputType)
+ + " to "
+ + DataType.findTypeName(expectedType) ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
// cast.getType() already returns the correct type so don't have to
@@ -603,103 +774,306 @@
/***********************************************************************/
/* Relational Operators */
/***********************************************************************/
+ /*
+ All the getType() of these operators always return BAG.
+ We just have to :-
+ 1) Check types of inputs, inner plans
+ 2) Compute output schema with type information
+ (At the moment, the parser does only return GetSchema with correct aliases)
+ 3) Insert casting if necessary
+
+ */
+
+ /*
+ The output schema of LOUnion is the merge of all input schemas.
+ Operands on left side always take precedance on aliases.
+
+ We allow type promotion here
+ */
+
+ @Override
+ protected void visit(LOUnion u) throws VisitorException {
+ List<LogicalOperator> inputs = u.getInputs() ;
+
+ // There is no point to union only one operand
+ // that should be a problem in the parser
+ if (inputs.size() < 2) {
+ AssertionError err = new AssertionError("Union with Count(Operand) < 2") ;
+ }
+
+ try {
+ Schema schema = inputs.get(0).getSchema() ;
+
+ // Keep merging one by one
+ for (int i=1; i< inputs.size() ;i++) {
+ // Assume the first input's aliases take precedance
+ schema = schema.merge(inputs.get(i).getSchema(), false) ;
+ // if they cannot be merged, we just give up
+ if (schema == null) {
+ String msg = "cannot merge schemas from inputs of UNION" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
+ }
+ }
+
+ try {
+ u.setSchema(schema);
+ }
+ catch (ParseException pe) {
+ // This should never happen
+ AssertionError err = new AssertionError("problem with computing UNION schema") ;
+ err.initCause(pe) ;
+ throw err ;
+ }
+
+ // Insert casting to inputs if necessary
+
+ for (int i=0; i< inputs.size() ;i++) {
+ insertCastForEachInBetweenIfNecessary(inputs.get(i), u, schema);
+ }
+
+
+ }
+ catch (FrontendException fee) {
+ // I don't quite understand how this can happen
+ // Anyway, just throw an exception to be on the safe side
+ String msg = "Problem while reading schemas from inputs of UNION" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ //vse.initCause(fee) ;
+ throw vse ;
+ }
+
+ }
+
+ @Override
+ protected void visit(LOSplitOutput op) throws VisitorException {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+
+ // LOSplitOutput can only have 1 input
+ List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
+ if (list.size() != 1) {
+ throw new AssertionError("LOSplitOutput can only have 1 input") ;
+ }
+
+ try {
+ op.setSchema(list.get(0).getSchema()) ;
+ }
+ catch (ParseException pe) {
+ String msg = "Problem while reading schemas from"
+ + " inputs of LOSplitOutput " ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+ catch (FrontendException fe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of LOSplitOutput" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+ }
+
/***
+ * LODistinct, output schema should be the same as input
+ * @param op
+ * @throws VisitorException
+ */
+
+ @Override
+ protected void visit(LODistinct op) throws VisitorException {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+ List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
+ // LOSplitOutput can only have 1 input
+
+ try {
+ op.setSchema(list.get(0).getSchema()) ;
+ }
+ catch (ParseException pe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of LODistinct" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+ catch (FrontendException fe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of LODistinct" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+ }
+
+ /***
+ * Return concatenated of all fields from all input operators
+ * @param cs
+ * @throws VisitorException
+ */
+ protected void visit(LOCross cs) throws VisitorException {
+ List<LogicalOperator> inputs = cs.getInputs() ;
+ List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
+
+ try {
+ for(LogicalOperator op: inputs) {
+ // All of inputs are relational operators
+ // so we can access getSchema()
+ Schema inputSchema = op.getSchema() ;
+ for(int i=0; i < inputSchema.size(); i++) {
+ // For types other than tuple
+ if (inputSchema.getField(0).type != DataType.TUPLE) {
+ fsList.add(new FieldSchema(inputSchema.getField(0).alias,
+ inputSchema.getField(0).type)) ;
+ }
+ // For tuple type
+ else {
+ fsList.add(new FieldSchema(inputSchema.getField(0).alias,
+ inputSchema.getField(0).schema)) ;
+ }
+ }
+ }
+ }
+ catch (ParseException pe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of CROSS" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+ catch (FrontendException fe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of CROSS" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+
+ try {
+ cs.setSchema(new Schema(fsList));
+ }
+ catch (ParseException pe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of CROSS" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+ }
+
+ /***
* The schema of sort output will be the same as sort input.
*
*/
-
- /*
+
protected void visit(LOSort s) throws VisitorException {
-
- // TODO: Why doesn't LOSort have getInput() ???
- List<LogicalOperator> inputList = mPlan.getPredecessors(s) ;
-
- if (inputList.size() != 1) {
- throw new AssertionError("LOSort cannot have more than one input") ;
- }
-
- LogicalOperator input = inputList.get(0) ;
+
+ LogicalOperator input = s.getInput() ;
// Type checking internal plans.
for(int i=0;i < s.getSortColPlans().size(); i++) {
- LogicalPlan sortColPlan = s.getSortColPlans().get(i) ;
- checkInnerPlan(sortColPlan, input.getSchema()) ;
-
- // Check that the inner plan has only 1 output
+ LogicalPlan sortColPlan = s.getSortColPlans().get(i) ;
+
+ // Check that the inner plan has only 1 output port
if (!sortColPlan.isSingleLeafPlan()) {
- throw new VisitorException("Sort's inner plans can only have one output (leaf)") ;
+ String msg = "LOSort's sort plan can only have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
+
+ checkInnerPlan(sortColPlan, input) ;
+
+ // TODO: May have to check SortFunc compatibility here in the future
}
- s.setType(input.getType()) ; // This should be bag always.
-
- // I assume we can enforce schema in Bags, so this has to be done
- try {
+ s.setType(input.getType()) ; // This should be bag always.
+
+ try {
s.setSchema(input.getSchema()) ;
- }
- catch (IOException ioe) {
- VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+ }
+ catch (FrontendException ioe) {
+ String msg = "Problem while reconciling output schema of LOSort" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(ioe) ;
throw vse ;
}
// TODO: Is this ParseException applicable ?
catch (ParseException pe) {
- VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+ String msg = "Problem while reconciling output schema of LOSort" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(pe) ;
throw vse ;
}
}
- */
+
+
/***
* The schema of filter output will be the same as filter input
*/
- /*
+
+ @Override
protected void visit(LOFilter filter) throws VisitorException {
-
+
LogicalOperator input = filter.getInput() ;
- LogicalPlan condPlan = filter.getConditionPlan() ;
- checkInnerPlan(condPlan, input.getSchema()) ;
+ LogicalPlan comparisonPlan = filter.getComparisonPlan() ;
- // Check that the inner plan has only 1 output
- if (!condPlan.isSingleLeafPlan()) {
- throw new VisitorException("Filter's cond plan can only have one output (leaf)") ;
+ // Check that the inner plan has only 1 output port
+ if (!comparisonPlan.isSingleLeafPlan()) {
+ String msg = "Filter's cond plan can only have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
+
+ checkInnerPlan(comparisonPlan, input) ;
- // TODO: We don't have a way to getOutputType of inner plan yet!
- byte innerCondType = condPlan.getLeaves().get(0).getType() ;
+ byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
if (innerCondType != DataType.BOOLEAN) {
- throw new VisitorException("Filter's condition must be boolean") ;
+ String msg = "Filter's condition must evaluate to boolean" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
filter.setType(input.getType()) ; // This should be bag always.
-
- // I assume we can enforce schema in Bags, so this has to be done
+
try {
filter.setSchema(input.getSchema()) ;
}
- catch (IOException ioe) {
- VisitorException vse = new VisitorException("Problem while reconciling output schema") ;
+ catch (FrontendException ioe) {
+ String msg = "Problem while reconciling output schema of LOFilter" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(ioe) ;
throw vse ;
}
// TODO: Is this ParseException applicable ?
catch (ParseException pe) {
- VisitorException vse = new VisitorException("Problem while reconciling output schema") ;
+ String msg = "Problem while reconciling output schema of LOFilter" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(pe) ;
throw vse ;
- };
-
+ }
+
}
- */
+
/***
* The schema of split output will be the same as split input
*/
- /*
+
protected void visit(LOSplit split) throws VisitorException {
-
+
// TODO: Why doesn't LOSplit have getInput() ???
List<LogicalOperator> inputList = mPlan.getPredecessors(split) ;
@@ -710,42 +1084,53 @@
LogicalOperator input = inputList.get(0) ;
// Checking internal plans.
+ ArrayList<LogicalPlan> condPlans
+ = new ArrayList<LogicalPlan>(split.getConditionPlans()) ;
+
for(int i=0;i < split.getConditionPlans().size(); i++) {
- LogicalPlan condPlan = split.getConditionPlans().get(i) ;
- checkInnerPlan(condPlan, input.getSchema()) ;
-
- // Check that the inner plan has only 1 output
+ LogicalPlan condPlan = condPlans.get(i) ;
+
+ // Check that the inner plan has only 1 output port
if (!condPlan.isSingleLeafPlan()) {
- throw new VisitorException("Split's inner plans can only have one output (leaf)") ;
+ String msg = "Split's cond plan can only have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
+ checkInnerPlan(condPlan, input) ;
+
byte innerCondType = condPlan.getLeaves().get(0).getType() ;
if (innerCondType != DataType.BOOLEAN) {
- throw new VisitorException("Split's conditions must be boolean") ;
+ String msg = "Split's condition must evaluate to boolean" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
}
}
split.setType(input.getType()) ; // This should be bag always
- // I assume we can enforce schema in Bags, so this has to be done
- try {
+ try {
split.setSchema(input.getSchema()) ;
- }
- catch (IOException ioe) {
- VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+ }
+ catch (FrontendException ioe) {
+ String msg = "Problem while reconciling output schema of LOSplit" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(ioe) ;
throw vse ;
}
// TODO: Is this ParseException applicable ?
catch (ParseException pe) {
- VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+ String msg = "Problem while reconciling output schema of LOSplit" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(pe) ;
throw vse ;
- };
+ }
}
- */
+
/**
* The output schema will be generated.
*/
@@ -793,54 +1178,111 @@
* Still have questions about LOCOGroup internal structure
* so this may not look quite right
*/
- /*
protected void visit(LOCogroup cg) throws VisitorException {
- // Type checking internal plans.
// TODO: Do all the GroupBy cols from all inputs have
- // to have the same type??? I assume "no"
- for(int i=0;i < cg.getGroupByPlans().size(); i++) {
-
- LogicalPlan groupColPlan = cg.getGroupByPlans().get(i) ;
- // TODO: Question!!!, why getInputs() here not returning
- // List<LogicalOperator> in the latest implementation ???
- LogicalOperator input = cg.getInputs().get(i) ;
- checkInnerPlan(groupColPlan, input.getSchema()) ;
-
- // Check that the inner plan has only 1 output
- // If we group by more than one fields, this should be tuple
- if (!groupColPlan.isSingleLeafPlan()) {
- throw new VisitorException("Cogroup's inner plans can only have one output (leave)") ;
+ // TODO: to have the same type??? I assume "no"
+
+ MultiMap<LogicalOperator, LogicalPlan> groupByPlans
+ = cg.getGroupByPlans() ;
+ List<LogicalOperator> inputs = cg.getInputs() ;
+
+ // Type checking internal plans.
+ for(int i=0;i < inputs.size(); i++) {
+
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(groupByPlans.get(input)) ;
+
+ for(int j=0; j < innerPlans.size(); j++) {
+
+ LogicalPlan innerPlan = innerPlans.get(j) ;
+
+ // Check that the inner plan has only 1 output port
+ if (!innerPlan.isSingleLeafPlan()) {
+ String msg = "COGroup's inner plans can only"
+ + "have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
+ }
+
+ checkInnerPlan(innerPlans.get(j), input) ;
}
-
-
+
}
- // Generate output schema
- List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
- fsList.add(new FieldSchema("group", DataType.UNKNOWN)) ;
- for(int i=0;i < cg.getInputs().size(); i++) {
- FieldSchema fs = new FieldSchema(cg.getInputs().getAlias(), DataType.BAG) ;
- fsList.add(fs) ;
- // TODO: How do we transfer Bag's Schema in this case?
+ // Generate output schema based on the schema generated from
+ // COGroup itself
+
+ try {
+
+ Schema schema = cg.getSchema() ;
+
+ for(int i=0; i< inputs.size(); i++) {
+ FieldSchema fs = schema.getField(i+1) ;
+ fs.type = DataType.BAG ;
+ fs.schema = inputs.get(i).getSchema() ;
+ }
+
+ cg.setType(DataType.BAG) ;
+ cg.setSchema(schema) ;
+
+ }
+ catch (FrontendException fe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+ catch (ParseException pe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
}
- cg.setType(DataType.BAG) ;
- cg.setSchema(new Schema(fsList)) ;
}
- */
- // TODO: NOT DONE YET
- //COmmenting out this method as its not using the new APIs
- /*
+ /***
+ * Output schema of LOGenerate is a tuple schma
+ * which is the output of all inner plans
+ *
+ * Flatten also has to be taken care on in here
+ *
+ */
+
protected void visit(LOGenerate g) throws VisitorException {
- // Visit each of generates projection elements.
- Iterator<ExpressionOperator> i = g.getProjections().iterator();
- while (i.hasNext()) {
- i.next().visit(this);
+ List<LogicalPlan> plans = g.getGeneratePlans() ;
+ List<Boolean> flattens = g.getFlatten() ;
+
+ /*
+ for(int i=0;i < plans.size(); i++) {
+
+ LogicalPlan plan = plans.get(i) ;
+
+ // Check that the inner plan has only 1 output port
+ if (!plan.isSingleLeafPlan()) {
+ String msg = "Generate's expression plan can "
+ + " only have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
+ }
+
+ checkInnerPlan(plan, ) ;
+
+ byte innerCondType = condPlan.getLeaves().get(0).getType() ;
+ if (innerCondType != DataType.BOOLEAN) {
+ String msg = "Split's condition must evaluate to boolean" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
+ }
+
}
+ */
+
}
- */
/***
* This does:-
@@ -849,13 +1291,13 @@
* 2) Type checking of the inner plan
* NOTE: This helper method only supports one source schema
*
- * @param innerPlan
- * @param inputSchema
+ * @param innerPlan the inner plan
+ * @param srcOuterOp the source data operator
* @throws VisitorException
*/
- /*
+
private void checkInnerPlan(LogicalPlan innerPlan,
- Schema inputSchema)
+ LogicalOperator srcOuterOp)
throws VisitorException {
// Preparation
int errorCount = 0 ;
@@ -863,15 +1305,39 @@
if (rootList.size() < 1) {
throw new AssertionError("Inner plan is poorly constructed") ;
}
-
+
+ Schema inputSchema = null ;
+ try {
+ inputSchema = srcOuterOp.getSchema() ;
+ }
+ catch(FrontendException fe) {
+ String msg = "Cannot not get schema out of "
+ + srcOuterOp.getClass().getSimpleName() ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
+ }
+
// Actual checking
for(LogicalOperator op: rootList) {
// TODO: Support map dereference
if (op instanceof LOProject) {
// Get the required field from input operator's schema
// Copy type and schema (if any) to the inner plan root
- // TODO: Is this the correct way of getting a column name from LOProject ???
- FieldSchema fs = inputSchema.getField(((LOProject)op).getColumnName()) ;
+ LOProject project = (LOProject)op ;
+
+ FieldSchema fs = null ;
+ try {
+ fs = inputSchema.getField(project.getCol()) ;
+ }
+ catch(ParseException pe) {
+ String msg = "Cannot not get schema out of "
+ + srcOuterOp.getClass().getSimpleName() ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+
if (fs != null) {
op.setType(fs.type) ;
if (fs.type == DataType.TUPLE) {
@@ -879,9 +1345,10 @@
op.setSchema(fs.schema) ;
}
catch (ParseException pe) {
- VisitorException vse = new VisitorException(
- "A schema from a field in input tuple cannot "
- + " be reconciled with inner plan schema ") ;
+ String msg = "A schema from a field in input tuple cannot "
+ + " be reconciled with inner plan schema " ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
vse.initCause(pe) ;
throw vse ;
}
@@ -891,7 +1358,10 @@
else {
errorCount++ ;
}
- }
+ }
+ else if (op instanceof LOConst) {
+ // don't have to do anything
+ }
else {
throw new AssertionError("Unsupported root operator in inner plan") ;
}
@@ -900,8 +1370,9 @@
// Throw an exception if we found errors
if (errorCount > 0) {
// TODO: Should indicate the field names or indexes here
- VisitorException vse = new VisitorException(
- "Some required fields in inner plan cannot be found in input") ;
+ String msg = "Some required fields in inner plan cannot be found in input" ;
+ msgCollector.collect(msg, MessageType.Error);
+ VisitorException vse = new VisitorException(msg) ;
throw vse ;
}
@@ -913,8 +1384,187 @@
popWalker() ;
}
- */
-
+
+
+ /***
+ * For casting insertion for relational operators
+ * only if it's necessary
+ * Currently this only does "shallow" casting
+ * @param fromOp
+ * @param toOp
+ * @param targetSchema array of target types
+ */
+ private void insertCastForEachInBetweenIfNecessary(LogicalOperator fromOp,
+ LogicalOperator toOp,
+ Schema targetSchema)
+ throws VisitorException {
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
+
+ // Make sure that two operators are in the same plan
+ if (fromOp.getPlan() != toOp.getPlan()) {
+ throw new AssertionError("Two operators have toOp be in the same plan") ;
+ }
+ // Mare sure that they are in the plan we're looking at
+ if (fromOp.getPlan() != toOp.getPlan()) {
+ throw new AssertionError("Cannot manipulate any other plan"
+ +" than the current one") ;
+ }
+
+ // Make sure that they are adjacent and the direction
+ // is from "fromOp" to "toOp"
+ List<LogicalOperator> preList = currentPlan.getPredecessors(toOp) ;
+ boolean found = false ;
+ for(LogicalOperator tmpOp: preList) {
+ // compare by reference
+ if (tmpOp == fromOp) {
+ found = true ;
+ break ;
+ }
+ }
+
+ if (!found) {
+ throw new AssertionError("Two operators are not adjacent") ;
+ }
+
+ // retrieve input schema to be casted
+ // this will be used later
+ Schema fromSchema = null ;
+ try {
+ fromSchema = fromOp.getSchema() ;
+ }
+ catch(FrontendException fe) {
+ AssertionError err = new AssertionError("Cannot get schema from"
+ + " input operator") ;
+ err.initCause(fe) ;
+ throw err ;
+ }
+
+ // make sure the supplied targetSchema has the same number of members
+ // as number of output fields from "fromOp"
+ if (fromSchema.size() != targetSchema.size()) {
+ throw new AssertionError("Invalid input parameters in cast insert") ;
+ }
+
+ // Compose the new inner plan to be used in ForEach
+ LogicalPlan foreachPlan = new LogicalPlan() ;
+
+ // Plans inside Generate. Fields that do not need casting will only
+ // have Project. Fields that need casting will have Project + Cast
+ ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>() ;
+
+ int castNeededCounter = 0 ;
+ for(int i=0;i < fromSchema.size(); i++) {
+
+ LogicalPlan genPlan = new LogicalPlan() ;
+ // TODO: should the LogicalOp here be null in this case?
+ LOProject project = new LOProject(genPlan,
+ genNewOperatorKey(fromOp),
+ null,
+ i) ;
+ genPlan.add(project);
+
+ // add casting if necessary by comparing target types
+ // to the input schema
+ FieldSchema fs = null ;
+ try {
+ fs = fromSchema.getField(i) ;
+ }
+ catch(ParseException pe) {
+ String msg = "Problem while reading"
+ + " field schema from input while"
+ + " insert casting " ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+
+ // This only does "shallow checking"
+
+ byte inputFieldType ;
+
+ try {
+ inputFieldType = targetSchema.getField(i).type ;
+ }
+ catch (ParseException e) {
+ throw new AssertionError("Cannot get field type") ;
+ }
+
+ if (inputFieldType != fs.type) {
+ castNeededCounter++ ;
+ LOCast cast = new LOCast(genPlan,
+ genNewOperatorKey(fromOp),
+ project,
+ inputFieldType) ;
+ genPlan.add(cast) ;
+ try {
+ genPlan.connect(project, cast);
+ }
+ catch (PlanException pe) {
+ // This should never happen
+ throw new AssertionError("unpected plan exception while insert casting") ;
+ }
+ }
+
+ generatePlans.add(genPlan) ;
+
+ }
+
+ // if we really need casting
+ if (castNeededCounter > 0) {
+ // Flatten List
+ // This is just cast insertion so we don't have any flatten
+ ArrayList<Boolean> flattenList = new ArrayList<Boolean>() ;
+ for(int i=0;i < targetSchema.size(); i++) {
+ flattenList.add(new Boolean(false)) ;
+ }
+
+ LOGenerate generate = new LOGenerate(currentPlan, genNewOperatorKey(fromOp),
+ generatePlans, flattenList) ;
+
+ foreachPlan.add(generate) ;
+
+ // Create ForEach to be inserted
+ LOForEach foreach = new LOForEach(currentPlan, genNewOperatorKey(fromOp), foreachPlan) ;
+
+ // Manipulate the plan structure
+ currentPlan.add(foreach);
+ currentPlan.disconnect(fromOp, toOp) ;
+
+ try {
+ currentPlan.connect(fromOp, foreach);
+ currentPlan.connect(foreach, toOp);
+ }
+ catch (PlanException pe) {
+ AssertionError err = new AssertionError("Problem wiring the plan while insert casting") ;
+ err.initCause(pe) ;
+ throw err ;
+ }
+ }
+ else {
+ log.debug("Tried to insert relational casting when not necessary");
+ }
+ }
+
+ /***
+ * Helper for collecting warning when casting is inserted
+ * to the plan (implicit casting)
+ *
+ * @param op
+ * @param originalType
+ * @param toType
+ */
+ private void collectCastWarning(LogicalOperator op,
+ byte originalType,
+ byte toType) {
+ String originalTypeName = DataType.findTypeName(originalType) ;
+ String toTypeName = DataType.findTypeName(toType) ;
+ String opName = op.getClass().getSimpleName() ;
+ msgCollector.collect(originalTypeName + " is implicitly casted to "
+ + toTypeName +" under " + opName + " Operator",
+ MessageType.Warning) ;
+ }
+
/***
* We need the neighbor to make sure that the new key is in the same scope
*/
@@ -924,7 +1574,8 @@
return new OperatorKey(scope, newId) ;
}
- protected void visit(LOLoad load) {
+ protected void visit(LOLoad load)
+ throws VisitorException {
// do nothing
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java Thu May 15 12:13:41 2008
@@ -29,7 +29,7 @@
* If there are errors during validation, all of the errors have
* to be collected in the supplied messageCollector. The exception should
* be thrown only when the validation logic finds something too bad
- * that the other validation logics should not try to do more work.
+ * that other validation logics should not try to do more work.
*
*/
public abstract void validate(P plan, CompilationMessageCollector messageCollector)
@@ -37,8 +37,8 @@
/**
* This convenient method is used when:
- * - if an exception is thrown from the current validation logic,
- * the whole validation pipeline should stop.
+ * - if an exception being thrown from the current validation logic
+ * indicates that the whole validation pipeline should stop.
* @param visitor
* @param messageCollector
* @throws PlanValidationException
@@ -60,9 +60,10 @@
/**
* This convenient method is used when:
- * - if an exception is thrown from the current validation logic,
- * the whole validation pipeline should keep going by continuing
- * with the next validation logic in the pipeline
+ * - if an exception being thrown from the current validation logic
+ * indicates that the whole validation pipeline should keep going
+ * by continuing with the next validation logic in the pipeline
+ * (skip the rest of the current logic)
* @param visitor
* @param messageCollector
* @throws PlanValidationException
@@ -80,4 +81,29 @@
}
}
+ /**
+ * This convenient method is used when:
+ * - if an exception being thrown from the current validation logic
+ * indicates that the whole validation pipeline should stop.
+ *
+ * This method also assumes that the appropriate error message
+ * has already been recorded in the message collector so
+ * there is no need to duplicate the error message again here.
+ *
+ * @param visitor
+ * @param messageCollector
+ * @throws PlanValidationException
+ */
+ protected void validateSkipCollectException(PlanVisitor<O, P> visitor,
+ CompilationMessageCollector messageCollector)
+ throws PlanValidationException {
+ try {
+ visitor.visit() ;
+ }
+ catch(VisitorException ve) {
+ throw new PlanValidationException("An unexpected exception caused "
+ + "the validation to stop", ve) ;
+ }
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java Thu May 15 12:13:41 2008
@@ -56,6 +56,9 @@
*/
public abstract PlanWalker<O, P> spawnChildWalker(P plan);
+ public P getPlan() {
+ return mPlan ;
+ }
}