You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC
svn commit: r1100420 [14/19] - in /pig/branches/branch-0.9: ./ src/
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apach...
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Sat May 7 00:15:40 2011
@@ -1,3194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer.validators;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.HashSet;
-import java.util.TreeMap;
-
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.Algebraic;
-import org.apache.pig.PigException;
-import org.apache.pig.PigWarning;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOUserFunc;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-
-import org.apache.pig.impl.logicalLayer.* ;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
-import org.apache.pig.impl.plan.*;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.streaming.StreamingCommand.Handle;
-import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Visitor for type checking. For simplicity of the first implementation,
- * we throw exception immediately once something doesn't look alright.
- * This is not quite smart e.g. if the plan has another unrelated branch.
- *
- */
-public class TypeCheckingVisitor extends LOVisitor {
-
- private static final int INF = -1;
-
- private static final Log log = LogFactory.getLog(TypeCheckingVisitor.class);
-
- private CompilationMessageCollector msgCollector = null ;
-
- private boolean strictMode = false ;
-
- private String currentAlias = null;
-
- public static final MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
- static{
- //Ordering here decides the score for the best fit function.
- //Do not change the order. Conversions to a smaller type is preferred
- //over conversion to a bigger type where ordering of types is:
- //INTEGER, LONG, FLOAT, DOUBLE, CHARARRAY, TUPLE, BAG, MAP
- //from small to big
-// castLookup.put(DataType.BOOLEAN, DataType.INTEGER);
-// castLookup.put(DataType.BOOLEAN, DataType.LONG);
-// castLookup.put(DataType.BOOLEAN, DataType.FLOAT);
-// castLookup.put(DataType.BOOLEAN, DataType.DOUBLE);
-// castLookup.put(DataType.BOOLEAN, DataType.CHARARRAY);
- castLookup.put(DataType.INTEGER, DataType.LONG);
- castLookup.put(DataType.INTEGER, DataType.FLOAT);
- castLookup.put(DataType.INTEGER, DataType.DOUBLE);
-// castLookup.put(DataType.INTEGER, DataType.CHARARRAY);
- castLookup.put(DataType.LONG, DataType.FLOAT);
- castLookup.put(DataType.LONG, DataType.DOUBLE);
-// castLookup.put(DataType.LONG, DataType.CHARARRAY);
- castLookup.put(DataType.FLOAT, DataType.DOUBLE);
-// castLookup.put(DataType.FLOAT, DataType.CHARARRAY);
-// castLookup.put(DataType.DOUBLE, DataType.CHARARRAY);
-// castLookup.put(DataType.BYTEARRAY, DataType.BOOLEAN);
- castLookup.put(DataType.BYTEARRAY, DataType.INTEGER);
- castLookup.put(DataType.BYTEARRAY, DataType.LONG);
- castLookup.put(DataType.BYTEARRAY, DataType.FLOAT);
- castLookup.put(DataType.BYTEARRAY, DataType.DOUBLE);
- castLookup.put(DataType.BYTEARRAY, DataType.CHARARRAY);
- castLookup.put(DataType.BYTEARRAY, DataType.TUPLE);
- castLookup.put(DataType.BYTEARRAY, DataType.BAG);
- castLookup.put(DataType.BYTEARRAY, DataType.MAP);
- }
-
- public TypeCheckingVisitor(LogicalPlan plan,
- CompilationMessageCollector messageCollector) {
- super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
- msgCollector = messageCollector ;
- }
-
- // Just in case caller is lazy
- @Override
- protected void visit(ExpressionOperator eOp)
- throws VisitorException {
- if (eOp instanceof BinaryExpressionOperator) {
- visit((BinaryExpressionOperator) eOp) ;
- }
- else if (eOp instanceof UnaryExpressionOperator) {
- visit((UnaryExpressionOperator) eOp) ;
- }
- else if (eOp instanceof LOConst) {
- visit((LOConst) eOp) ;
- }
- else if (eOp instanceof LOBinCond) {
- visit((LOBinCond) eOp) ;
- }
- else if (eOp instanceof LOCast) {
- visit((LOCast) eOp) ;
- }
- else if (eOp instanceof LORegexp) {
- visit((LORegexp) eOp) ;
- }
- else if (eOp instanceof LOUserFunc) {
- visit((LOUserFunc) eOp) ;
- }
- else if (eOp instanceof LOProject) {
- visit((LOProject) eOp) ;
- }
- else if (eOp instanceof LONegative) {
- visit((LONegative) eOp) ;
- }
- else if (eOp instanceof LONot) {
- visit((LONot) eOp) ;
- }
- else if (eOp instanceof LOMapLookup) {
- visit((LOMapLookup) eOp) ;
- }
- // TODO: Check that all operators are included here
- }
-
-
- // Just in case caller is lazy
- @Override
- protected void visit(LogicalOperator lOp)
- throws VisitorException {
- if (lOp instanceof LOLoad) {
- visit((LOLoad) lOp) ;
- }
- else if (lOp instanceof LODistinct) {
- visit((LODistinct) lOp) ;
- }
- else if (lOp instanceof LOFilter) {
- visit((LOFilter) lOp) ;
- }
- else if (lOp instanceof LOUnion) {
- visit((LOUnion) lOp) ;
- }
- else if (lOp instanceof LOSplit) {
- visit((LOSplit) lOp) ;
- }
- else if (lOp instanceof LOSplitOutput) {
- visit((LOSplitOutput) lOp) ;
- }
- else if (lOp instanceof LOCogroup) {
- visit((LOCogroup) lOp) ;
- }
- else if (lOp instanceof LOSort) {
- visit((LOSort) lOp) ;
- }
- else if (lOp instanceof LOForEach) {
- visit((LOForEach) lOp) ;
- }
- else if (lOp instanceof LOGenerate) {
- visit((LOGenerate) lOp) ;
- }
- else if (lOp instanceof LOCross) {
- visit((LOCross) lOp) ;
- }
- // TODO: Check that all operators are included here
- }
-
-
-
- protected void visit(LOProject pj) throws VisitorException {
- resolveLOProjectType(pj) ;
- }
-
- private void resolveLOProjectType(LOProject pj) throws VisitorException {
-
- try {
- pj.getFieldSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1035;
- String msg = "Error getting LOProject's input schema" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- /**
- * LOConst. Type information should be associated with LOConst
- * in the parsing stage so we don't need any logic here
- */
- @Override
- protected void visit(LOConst cs)
- throws VisitorException {
-
- }
-
- @Override
- public void visit(LOMapLookup map)
- throws VisitorException {
- if(!DataType.isAtomic(DataType.findType(map.getLookUpKey()))) {
- int errCode = 1036;
- String msg = "Map key should be a basic type" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- map.setType(map.getValueType());
- if(map.getMap().getType() != DataType.MAP) {
- // insert cast if the predecessor does not
- // return map
- insertCast(map, DataType.MAP, null, map.getMap());
- }
-
- }
-
- /**
- * LORegexp expects CharArray as input
- * Itself always returns Boolean
- * @param rg
- */
- @Override
- protected void visit(LORegexp rg)
- throws VisitorException {
-
- // We allow BYTEARRAY to be converted to CHARARRAY
- if (rg.getOperand().getType() == DataType.BYTEARRAY)
- {
- insertCastForRegexp(rg) ;
- }
-
- // Other than that if it's not CharArray just say goodbye
- if (rg.getOperand().getType() != DataType.CHARARRAY)
- {
- int errCode = 1037;
- String msg = "Operand of Regex can be CharArray only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- }
-
- private void insertCastForRegexp(LORegexp rg) throws VisitorException {
- insertCast(rg, DataType.CHARARRAY, null, rg.getOperand());
- }
-
- public void visit(LOAnd binOp) throws VisitorException {
- // if lhs or rhs is null constant then cast it to boolean
- insertCastsForNullToBoolean(binOp);
- ExpressionOperator lhs = binOp.getLhsOperand();
- ExpressionOperator rhs = binOp.getRhsOperand();
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( (lhsType != DataType.BOOLEAN) ||
- (rhsType != DataType.BOOLEAN) ) {
- int errCode = 1038;
- String msg = "Operands of AND/OR can be boolean only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- /**
- * @param binOp
- * @throws VisitorException
- */
- private void insertCastsForNullToBoolean(BinaryExpressionOperator binOp)
- throws VisitorException {
- if (binOp.getLhsOperand() instanceof LOConst
- && ((LOConst) binOp.getLhsOperand()).getValue() == null)
- insertLeftCastForBinaryOp(binOp, DataType.BOOLEAN);
- if (binOp.getRhsOperand() instanceof LOConst
- && ((LOConst) binOp.getRhsOperand()).getValue() == null)
- insertRightCastForBinaryOp(binOp, DataType.BOOLEAN);
- }
-
- @Override
- public void visit(LOOr binOp) throws VisitorException {
- // if lhs or rhs is null constant then cast it to boolean
- insertCastsForNullToBoolean(binOp);
- ExpressionOperator lhs = binOp.getLhsOperand();
- ExpressionOperator rhs = binOp.getRhsOperand();
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( (lhsType != DataType.BOOLEAN) ||
- (rhsType != DataType.BOOLEAN) ) {
- int errCode = 1038;
- String msg = "Operands of AND/OR can be boolean only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOMultiply binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- // return the bigger type
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(rhsType)) ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(lhsType)) ) {
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // Cast both operands to double
- insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
- insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "Multiplication", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- try {
- binOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Multiply field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- @Override
- public void visit(LODivide binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- // return the bigger type
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(rhsType)) ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(lhsType)) ) {
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // Cast both operands to double
- insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
- insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "Division", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- try {
- binOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Divide field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- @Override
- public void visit(LOAdd binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- // return the bigger type
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(rhsType)) ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(lhsType)) ) {
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // Cast both operands to double
- insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
- insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "Add", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- try {
- binOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Add field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- @Override
- public void visit(LOSubtract binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- // return the bigger type
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(rhsType)) ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- (DataType.isNumberType(lhsType)) ) {
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // Cast both operands to double
- insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
- insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "Subtract", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- try {
- binOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Subtract field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
-
-
- @Override
- public void visit(LOGreaterThan binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
- // If not the same type, we cast them to the same
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "GreaterThan", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOGreaterThanEqual binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
- // If not the same type, we cast them to the same
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "GreaterThanEqualTo", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOLesserThan binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
- // If not the same type, we cast them to the same
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "LesserThan", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOLesserThanEqual binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
- // If not the same type, we cast them to the same
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "LesserThanEqualTo", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
-
-
- @Override
- public void visit(LOEqual binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
-
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.TUPLE) &&
- (rhsType == DataType.TUPLE) ) {
- // good
- }
- else if ( (lhsType == DataType.MAP) &&
- (rhsType == DataType.MAP) ) {
- // good
- }
- // A constant null is always bytearray - so cast it
- // to rhs type
- else if (binOp.getLhsOperand() instanceof LOConst
- && ((LOConst) binOp.getLhsOperand()).getValue() == null) {
- insertLeftCastForBinaryOp(binOp, rhsType);
- } else if (binOp.getRhsOperand() instanceof LOConst
- && ((LOConst) binOp.getRhsOperand()).getValue() == null) {
- insertRightCastForBinaryOp(binOp, lhsType);
- } else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "EqualTo", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LONotEqual binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
-
- if ( DataType.isNumberType(lhsType) &&
- DataType.isNumberType(rhsType) ) {
-
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-
- // Cast smaller type to the bigger type
- if (lhsType != biggerType) {
- insertLeftCastForBinaryOp(binOp, biggerType) ;
- }
- else if (rhsType != biggerType) {
- insertRightCastForBinaryOp(binOp, biggerType) ;
- }
-
- }
- else if ( (lhsType == DataType.CHARARRAY) &&
- (rhsType == DataType.CHARARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- (rhsType == DataType.BYTEARRAY) ) {
- // good
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
- ) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else if ( (rhsType == DataType.BYTEARRAY) &&
- ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
- ) {
- // Cast byte array to the type on lhs
- insertRightCastForBinaryOp(binOp, lhsType) ;
- }
- else if ( (lhsType == DataType.TUPLE) &&
- (rhsType == DataType.TUPLE) ) {
- // good
- }
- else if ( (lhsType == DataType.MAP) &&
- (rhsType == DataType.MAP) ) {
- // good
- }
- // A constant null is always bytearray - so cast it
- // to rhs type
- else if (binOp.getLhsOperand() instanceof LOConst
- && ((LOConst) binOp.getLhsOperand()).getValue() == null) {
- insertLeftCastForBinaryOp(binOp, rhsType);
- } else if (binOp.getRhsOperand() instanceof LOConst
- && ((LOConst) binOp.getRhsOperand()).getValue() == null) {
- insertRightCastForBinaryOp(binOp, lhsType);
- } else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "NotEqual", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOMod binOp) throws VisitorException {
- ExpressionOperator lhs = binOp.getLhsOperand() ;
- ExpressionOperator rhs = binOp.getRhsOperand() ;
-
- byte lhsType = lhs.getType() ;
- byte rhsType = rhs.getType() ;
-
- if ( (lhsType == DataType.INTEGER) &&
- (rhsType == DataType.INTEGER)
- ) {
- //do nothing
- }
- else if ( (lhsType == DataType.LONG) &&
- ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )
- ) {
- if (rhsType == DataType.INTEGER) {
- insertRightCastForBinaryOp(binOp, DataType.LONG) ;
- }
- }
- else if ( (rhsType == DataType.LONG) &&
- ( (lhsType == DataType.INTEGER) || (lhsType == DataType.LONG) )
- ) {
- if (lhsType == DataType.INTEGER) {
- insertLeftCastForBinaryOp(binOp, DataType.LONG) ;
- }
- }
- else if ( (lhsType == DataType.BYTEARRAY) &&
- ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )
- ) {
- insertLeftCastForBinaryOp(binOp, rhsType) ;
- }
- else {
- int errCode = 1039;
- String msg = generateIncompatibleTypesMessage(binOp, "Mod", lhsType, rhsType);
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- try {
- binOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Mod field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
-
- @Override
- public void visit(LONegative uniOp) throws VisitorException {
- byte type = uniOp.getOperand().getType() ;
-
-
- if (DataType.isNumberType(type)) {
- //do nothing
- }
- else if (type == DataType.BYTEARRAY) {
- insertCastForUniOp(uniOp, DataType.DOUBLE) ;
- }
- else {
- int errCode = 1041;
- String msg = "NEG can be used with numbers or Bytearray only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- try {
- uniOp.regenerateFieldSchema();
- } catch (FrontendException fe) {
- int errCode = 1040;
- String msg = "Could not set Negative field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- @Override
- public void visit(LONot uniOp) throws VisitorException {
- if (uniOp.getOperand() instanceof LOConst
- && ((LOConst) uniOp.getOperand()).getValue() == null) {
- insertCastForUniOp(uniOp, DataType.BOOLEAN);
- }
- byte type = uniOp.getOperand().getType();
- if (type != DataType.BOOLEAN) {
- int errCode = 1042;
- String msg = "NOT can be used with boolean only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- @Override
- public void visit(LOIsNull uniOp) throws VisitorException {
- }
-
- private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
- byte toType ) throws VisitorException {
- insertCast(binOp, toType, null, binOp.getLhsOperand());
- }
-
- private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
- byte toType ) throws VisitorException {
- insertCast(binOp, toType, null, binOp.getRhsOperand());
- }
-
-
- private void insertCast(ExpressionOperator node,
- byte toType, FieldSchema toFs, ExpressionOperator predecessor)
- throws VisitorException {
- LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
- collectCastWarning(node, predecessor.getType(), toType);
-
- OperatorKey newKey = genNewOperatorKey(node);
- LOCast cast = new LOCast(currentPlan, newKey, toType) ;
- try {
- if (toFs!=null)
- cast.setFieldSchema(toFs);
- } catch (FrontendException e) {
- int errCode = 2217;
- String msg = "Problem setFieldSchema for " + node + " ";
- throw new TypeCheckerException(msg, errCode, PigException.BUG, e);
- }
- currentPlan.add(cast) ;
- try {
- currentPlan.insertBetween(predecessor, cast, node);
- }
- catch (PlanException pe) {
- int errCode = 2059;
- String msg = "Problem with inserting cast operator for " + node + " in plan.";
- throw new TypeCheckerException(msg, errCode, PigException.BUG, pe);
- }
- this.visit(cast);
- }
-
- /**
- * The cast insertion for UDF is slight different in that we need to link the SchemaField
- * in the cast with its parent. This is because we don't call its getSchemafield() when
- * looking for loadfuncSpec. See getLoadFuncSpec(LogicalOperator op, String parentCanonicalName)
- * for more information.
- */
- private void insertCastForUDF(LOUserFunc udf,
- FieldSchema fromFS, FieldSchema toFs, ExpressionOperator predecessor)
- throws VisitorException {
- toFs.setParent( fromFS.canonicalName, predecessor );
- insertCast( udf, fromFS.type, toFs, predecessor );
- }
-
-
- /**
- * Currently, there are two unaryOps: Neg and Not.
- */
- @Override
- protected void visit(UnaryExpressionOperator uniOp) throws VisitorException {
-
- byte type = uniOp.getOperand().getType() ;
-
- if (uniOp instanceof LONegative) {
- if (DataType.isNumberType(type)) {
- uniOp.setType(type) ;
- }
- else if (type == DataType.BYTEARRAY) {
- insertCastForUniOp(uniOp, DataType.DOUBLE) ;
- uniOp.setType(DataType.DOUBLE) ;
- }
- else {
- int errCode = 1041;
- String msg = "NEG can be used with numbers or Bytearray only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- }
- else if (uniOp instanceof LONot) {
- if (type == DataType.BOOLEAN) {
- uniOp.setType(DataType.BOOLEAN) ;
- }
- else {
- int errCode = 1042;
- String msg = "NOT can be used with boolean only" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- }
- else {
- // undefined for this unknown unary operator
- int errCode = 1079;
- String msg = "Undefined type checking logic for unary operator: " + uniOp.getClass().getSimpleName();
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- }
-
- private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) throws VisitorException {
- insertCast(uniOp, toType, null, uniOp.getOperand());
- }
-
- // Currently there is no input type information support in UserFunc
- // So we can just check if all inputs are not of any stupid type
- @Override
- protected void visit(LOUserFunc func) throws VisitorException {
-
- List<ExpressionOperator> list = func.getArguments() ;
-
- // If the dependency graph is right, all the inputs
- // must already know the types
- Schema s = new Schema();
- for(ExpressionOperator op: list) {
- if (!DataType.isUsableType(op.getType())) {
- int errCode = 1014;
- String msg = "Problem with input " + op + " of User-defined function: " + func;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- try {
- s.add(op.getFieldSchema());
- } catch (FrontendException e) {
- int errCode = 1043;
- String msg = "Unable to retrieve field schema.";
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, e);
- }
-
- }
-
- EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(func.getFuncSpec());
-
- // ask the EvalFunc what types of inputs it can handle
- List<FuncSpec> funcSpecs = null;
- try {
- funcSpecs = ef.getArgToFuncMapping();
- } catch (Exception e) {
- int errCode = 1044;
- String msg = "Unable to get list of overloaded methods.";
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, e);
- }
-
- /**
- * Here is an explanation of the way the matching UDF funcspec will be chosen
- * based on actual types in the input schema.
- * First an "exact" match is tried for each of the fields in the input schema
- * with the corresponding fields in the candidate funcspecs' schemas.
- *
- * If exact match fails, then first a check if made if the input schema has any
- * bytearrays in it.
- *
- * If there are NO bytearrays in the input schema, then a best fit match is attempted
- * for the different fields. Essential a permissible cast from one type to another
- * is given a "score" based on its position in the "castLookup" table. A final
- * score for a candidate funcspec is deduced as
- * SUM(score_of_particular_cast*noOfCastsSoFar).
- * If no permissible casts are possible, the score for the candidate is -1. Among
- * the non -1 score candidates, the candidate with the lowest score is chosen.
- *
- * If there are bytearrays in the input schema, a modified exact match is tried. In this
- * matching, bytearrays in the input schema are not considered. As a result of
- * ignoring the bytearrays, we could get multiple candidate funcspecs which match
- * "exactly" for the other columns - if this is the case, we notify the user of
- * the ambiguity and error out. Else if all other (non byte array) fields
- * matched exactly, then we can cast bytearray(s) to the corresponding type(s)
- * in the matched udf schema. If this modified exact match fails, the above best fit
- * algorithm is attempted by initially coming up with scores and candidate funcSpecs
- * (with bytearray(s) being ignored in the scoring process). Then a check is
- * made to ensure that the positions which have bytearrays in the input schema
- * have the same type (for a given position) in the corresponding positions in
- * all the candidate funcSpecs. If this is not the case, it indicates a conflict
- * and the user is notified of the error (because we have more than
- * one choice for the destination type of the cast for the bytearray). If this is the case,
- * the candidate with the lowest score is chosen.
- */
-
-
-
- FuncSpec matchingSpec = null;
- boolean notExactMatch = false;
- if(funcSpecs!=null && funcSpecs.size()!=0){
- //Some function mappings found. Trying to see
- //if one of them fits the input schema
- if((matchingSpec = exactMatch(funcSpecs, s, func))==null){
- //Oops, no exact match found. Trying to see if we
- //have mappings that we can fit using casts.
- notExactMatch = true;
- if(byteArrayFound(s)){
- // try "exact" matching all other fields except the byte array
- // fields and if they all exact match and we have only one candidate
- // for the byte array cast then that's the matching one!
- if((matchingSpec = exactMatchWithByteArrays(funcSpecs, s, func))==null){
- // "exact" match with byte arrays did not work - try best fit match
- if((matchingSpec = bestFitMatchWithByteArrays(funcSpecs, s, func)) == null) {
- int errCode = 1045;
- String msg = "Could not infer the matching function for "
- + func.getFuncSpec()
- + " as multiple or none of them fit. Please use an explicit cast.";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT);
- }
- }
- } else if ((matchingSpec = bestFitMatch(funcSpecs, s)) == null) {
- // Either no byte arrays found or there are byte arrays
- // but only one mapping exists.
- // However, we could not find a match as there were either
- // none fitting the input schema or it was ambiguous.
- // Throw exception that we can't infer a fit.
- int errCode = 1045;
- String msg = "Could not infer the matching function for "
- + func.getFuncSpec()
- + " as multiple or none of them fit. Please use an explicit cast.";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT);
- }
- }
- }
- if(matchingSpec!=null){
- //Voila! We have a fitting match. Lets insert casts and make
- //it work.
- // notify the user about the match we picked if it was not
- // an exact match
- if(notExactMatch) {
- String msg = "Function " + func.getFuncSpec().getClassName() + "()" +
- " will be called with following argument types: " +
- matchingSpec.getInputArgsSchema() + ". If you want to use " +
- "different input argument types, please use explicit casts.";
- msgCollector.collect(msg, MessageType.Warning, PigWarning.USING_OVERLOADED_FUNCTION);
- }
- matchingSpec.setCtorArgs(func.getFuncSpec().getCtorArgs());
- func.setFuncSpec(matchingSpec);
- insertCastsForUDF(func, s, matchingSpec.getInputArgsSchema());
-
- }
-
- //Regenerate schema as there might be new additions
- try {
- func.regenerateFieldSchema();
- } catch (FrontendException fee) {
- int errCode = 1040;
- String msg = "Could not set UserFunc field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
- }
- }
-
- /**
- * Finds if there is an exact match between the schema supported by
- * one of the funcSpecs and the input schema s. Here first exact match
- * for all non byte array fields is first attempted and if there is
- * exactly one candidate, it is chosen (since the bytearray(s) can
- * just be cast to corresponding type(s) in the candidate)
- * @param funcSpecs - mappings provided by udf
- * @param s - input schema
- * @param func - LOUserfunc for which matching is requested
- * @return the matching spec if found else null
- * @throws VisitorException
- */
- private FuncSpec exactMatchWithByteArrays(List<FuncSpec> funcSpecs,
- Schema s, LOUserFunc func) throws VisitorException {
- // exact match all fields except byte array fields
- // ignore byte array fields for matching
- return exactMatchHelper(funcSpecs, s, func, true);
- }
-
- /**
- * Finds if there is an exact match between the schema supported by
- * one of the funcSpecs and the input schema s. Here an exact match
- * for all fields is attempted.
- * @param funcSpecs - mappings provided by udf
- * @param s - input schema
- * @param func - LOUserfunc for which matching is requested
- * @return the matching spec if found else null
- * @throws VisitorException
- */
- private FuncSpec exactMatch(List<FuncSpec> funcSpecs, Schema s,
- LOUserFunc func) throws VisitorException {
- // exact match all fields, don't ignore byte array fields
- return exactMatchHelper(funcSpecs, s, func, false);
- }
-
- /**
- * Tries to find the schema supported by one of funcSpecs which can
- * be obtained by inserting a set of casts to the input schema
- * @param funcSpecs - mappings provided by udf
- * @param s - input schema
- * @return the funcSpec that supports the schema that is best suited
- * to s. The best suited schema is one that has the
- * lowest score as returned by fitPossible().
- */
- private FuncSpec bestFitMatch(List<FuncSpec> funcSpecs, Schema s) {
- FuncSpec matchingSpec = null;
- long score = INF;
- long prevBestScore = Long.MAX_VALUE;
- long bestScore = Long.MAX_VALUE;
- for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
- FuncSpec fs = iterator.next();
- score = fitPossible(s,fs.getInputArgsSchema());
- if(score!=INF && score<=bestScore){
- matchingSpec = fs;
- prevBestScore = bestScore;
- bestScore = score;
- }
- }
- if(matchingSpec!=null && bestScore!=prevBestScore)
- return matchingSpec;
-
- return null;
- }
-
- private static class ScoreFuncSpecListComparator implements Comparator<Pair<Long, FuncSpec>> {
-
- /* (non-Javadoc)
- * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
- */
- public int compare(Pair<Long, FuncSpec> o1, Pair<Long, FuncSpec> o2) {
- if(o1.first < o2.first)
- return -1;
- else if (o1.first > o2.first)
- return 1;
- else
- return 0;
- }
-
- }
-
- /**
- * Tries to find the schema supported by one of funcSpecs which can be
- * obtained by inserting a set of casts to the input schema
- *
- * @param funcSpecs -
- * mappings provided by udf
- * @param s -
- * input schema
- * @return the funcSpec that supports the schema that is best suited to s.
- * The best suited schema is one that has the lowest score as
- * returned by fitPossible().
- * @throws VisitorException
- */
- private FuncSpec bestFitMatchWithByteArrays(List<FuncSpec> funcSpecs,
- Schema s, LOUserFunc func) throws VisitorException {
- List<Pair<Long, FuncSpec>> scoreFuncSpecList = new ArrayList<Pair<Long,FuncSpec>>();
- for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator
- .hasNext();) {
- FuncSpec fs = iterator.next();
- long score = fitPossible(s, fs.getInputArgsSchema());
- if (score != INF) {
- scoreFuncSpecList.add(new Pair<Long, FuncSpec>(score, fs));
- }
- }
-
- // if no candidates found, return null
- if(scoreFuncSpecList.size() == 0)
- return null;
-
- if(scoreFuncSpecList.size() > 1) {
- // sort the candidates based on score
- Collections.sort(scoreFuncSpecList, new ScoreFuncSpecListComparator());
-
- // if there are two (or more) candidates with the same *lowest* score
- // we cannot choose one of them - notify the user
- if (scoreFuncSpecList.get(0).first == scoreFuncSpecList.get(1).first) {
- int errCode = 1046;
- String msg = "Multiple matching functions for "
- + func.getFuncSpec() + " with input schemas: " + "("
- + scoreFuncSpecList.get(0).second.getInputArgsSchema() + ", "
- + scoreFuncSpecList.get(1).second.getInputArgsSchema() + "). Please use an explicit cast.";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT);
- }
-
- // now consider the bytearray fields
- List<Integer> byteArrayPositions = getByteArrayPositions(s);
- // make sure there is only one type to "cast to" for the byte array
- // positions among the candidate funcSpecs
- Map<Integer, Pair<FuncSpec, Byte>> castToMap = new HashMap<Integer, Pair<FuncSpec, Byte>>();
- for (Iterator<Pair<Long, FuncSpec>> it = scoreFuncSpecList.iterator(); it.hasNext();) {
- FuncSpec funcSpec = it.next().second;
- Schema sch = funcSpec.getInputArgsSchema();
- for (Iterator<Integer> iter = byteArrayPositions.iterator(); iter
- .hasNext();) {
- Integer i = iter.next();
- try {
- if (!castToMap.containsKey(i)) {
- // first candidate
- castToMap.put(i, new Pair<FuncSpec, Byte>(funcSpec, sch
- .getField(i).type));
- } else {
- // make sure the existing type from an earlier candidate
- // matches
- Pair<FuncSpec, Byte> existingPair = castToMap.get(i);
- if (sch.getField(i).type != existingPair.second) {
- int errCode = 1046;
- String msg = "Multiple matching functions for "
- + func.getFuncSpec() + " with input schema: "
- + "(" + existingPair.first.getInputArgsSchema()
- + ", " + funcSpec.getInputArgsSchema()
- + "). Please use an explicit cast.";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT);
- }
- }
- } catch (FrontendException fee) {
- int errCode = 1043;
- String msg = "Unalbe to retrieve field schema.";
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee);
- }
- }
- }
- }
-
- // if we reached here, it means we have >= 1 candidates and these candidates
- // have the same type for position which have bytearray in the input
- // Also the candidates are stored sorted by score in a list - we can now
- // just return the first candidate (the one with the lowest score)
- return scoreFuncSpecList.get(0).second;
- }
-
- /**
- * Checks to see if any field of the input schema is a byte array
- * @param s - input schema
- * @return true if found else false
- * @throws VisitorException
- */
- private boolean byteArrayFound(Schema s) throws VisitorException {
- for(int i=0;i<s.size();i++){
- try {
- FieldSchema fs=s.getField(i);
- if(fs.type==DataType.BYTEARRAY){
- return true;
- }
- } catch (FrontendException fee) {
- int errCode = 1043;
- String msg = "Unable to retrieve field schema.";
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee);
- }
- }
- return false;
- }
-
- /**
- * Gets the positions in the schema which are byte arrays
- *
- * @param s -
- * input schema
- * @throws VisitorException
- */
- private List<Integer> getByteArrayPositions(Schema s)
- throws VisitorException {
- List<Integer> result = new ArrayList<Integer>();
- for (int i = 0; i < s.size(); i++) {
- try {
- FieldSchema fs = s.getField(i);
- if (fs.type == DataType.BYTEARRAY) {
- result.add(i);
- }
- } catch (FrontendException fee) {
- int errCode = 1043;
- String msg = "Unable to retrieve field schema.";
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee); }
- }
- return result;
- }
-
- /**
- * Finds if there is an exact match between the schema supported by
- * one of the funcSpecs and the input schema s
- * @param funcSpecs - mappings provided by udf
- * @param s - input schema
- * @param ignoreByteArrays - flag for whether the exact match is to computed
- * after ignoring bytearray (if true) or without ignoring bytearray (if false)
- * @return the matching spec if found else null
- * @throws VisitorException
- */
- private FuncSpec exactMatchHelper(List<FuncSpec> funcSpecs, Schema s, LOUserFunc func, boolean ignoreByteArrays) throws VisitorException {
- List<FuncSpec> matchingSpecs = new ArrayList<FuncSpec>();
- for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
- FuncSpec fs = iterator.next();
- if (schemaEqualsForMatching(s, fs.getInputArgsSchema(), ignoreByteArrays)) {
- matchingSpecs.add(fs);
- }
- }
- if(matchingSpecs.size() == 0)
- return null;
-
- if(matchingSpecs.size() > 1) {
- int errCode = 1046;
- String msg = "Multiple matching functions for "
- + func.getFuncSpec() + " with input schema: "
- + "(" + matchingSpecs.get(0).getInputArgsSchema()
- + ", " + matchingSpecs.get(1).getInputArgsSchema()
- + "). Please use an explicit cast.";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT);
- }
-
- // exactly one matching spec - return it
- return matchingSpecs.get(0);
- }
-
- /***************************************************************************
- * Compare two schemas for equality for argument matching purposes. This is
- * a more relaxed form of Schema.equals wherein first the Datatypes of the
- * field schema are checked for equality. Then if a field schema in the udf
- * schema is for a complex type AND if the inner schema is NOT null, check
- * for schema equality of the inner schemas of the UDF field schema and
- * input field schema
- *
- * @param inputSchema
- * @param udfSchema
- * @param ignoreByteArrays
- * @return true if FieldSchemas are equal for argument matching, false
- * otherwise
- */
- public static boolean schemaEqualsForMatching(Schema inputSchema,
- Schema udfSchema, boolean ignoreByteArrays) {
- // If both of them are null, they are equal
- if ((inputSchema == null) && (udfSchema == null)) {
- return true;
- }
-
- // otherwise
- if (inputSchema == null) {
- return false;
- }
-
- if (udfSchema == null) {
- return false;
- }
-
- if (inputSchema.size() != udfSchema.size())
- return false;
-
- Iterator<FieldSchema> i = inputSchema.getFields().iterator();
- Iterator<FieldSchema> j = udfSchema.getFields().iterator();
-
- while (i.hasNext()) {
-
- FieldSchema inputFieldSchema = i.next();
- FieldSchema udfFieldSchema = j.next();
-
- if(ignoreByteArrays && inputFieldSchema.type == DataType.BYTEARRAY) {
- continue;
- }
-
- if (inputFieldSchema.type != udfFieldSchema.type) {
- return false;
- }
-
- // if a field schema in the udf schema is for a complex
- // type AND if the inner schema is NOT null, check for schema
- // equality of the inner schemas of the UDF field schema and
- // input field schema. If the field schema in the udf schema is
- // for a complex type AND if the inner schema IS null it means
- // the udf is applicable for all input which has the same type
- // for that field (irrespective of inner schema)
- if (DataType.isSchemaType(udfFieldSchema.type)
- && udfFieldSchema.schema != null) {
- // Compare recursively using field schema
- if (!FieldSchema.equals(inputFieldSchema, udfFieldSchema,
- false, true)) {
- return false;
- }
- }
-
- }
- return true;
- }
-
- /**
- * Computes a modified version of manhattan distance between
- * the two schemas: s1 & s2. Here the value on the same axis
- * are preferred over values that change axis as this means
- * that the number of casts required will be lesser on the same
- * axis.
- *
- * However, this function ceases to be a metric as the triangle
- * inequality does not hold.
- *
- * Each schema is an s1.size() dimensional vector.
- * The ordering for each axis is as defined by castLookup.
- * Unallowed casts are returned a dist of INFINITY.
- * @param s1
- * @param s2
- * @return
- */
- private long fitPossible(Schema s1, Schema s2) {
- if(s1==null || s2==null) return INF;
- List<FieldSchema> sFields = s1.getFields();
- List<FieldSchema> fsFields = s2.getFields();
- if(sFields.size()!=fsFields.size())
- return INF;
- long score = 0;
- int castCnt=0;
- for(int i=0;i<sFields.size();i++){
- FieldSchema sFS = sFields.get(i);
-
- // if we have a byte array do not include it
- // in the computation of the score - bytearray
- // fields will be looked at separately outside
- // of this function
- if (sFS.type == DataType.BYTEARRAY)
- continue;
-
- FieldSchema fsFS = fsFields.get(i);
-
- if(DataType.isSchemaType(sFS.type)){
- if(!FieldSchema.equals(sFS, fsFS, false, true))
- return INF;
- }
- if(FieldSchema.equals(sFS, fsFS, true, true)) continue;
- if(!castLookup.containsKey(sFS.type))
- return INF;
- if(!(castLookup.get(sFS.type).contains(fsFS.type)))
- return INF;
- score += ((List)castLookup.get(sFS.type)).indexOf(fsFS.type) + 1;
- ++castCnt;
- }
- return score * castCnt;
- }
-
- private void insertCastsForUDF(LOUserFunc udf, Schema fromSch, Schema toSch) throws VisitorException {
- List<FieldSchema> fsLst = fromSch.getFields();
- List<FieldSchema> tsLst = toSch.getFields();
- List<ExpressionOperator> args = udf.getArguments();
- int i=-1;
- for (FieldSchema fFSch : fsLst) {
- ++i;
- FieldSchema tFSch = tsLst.get(i);
- if(fFSch.type==tFSch.type) {
- continue;
- }
- insertCastForUDF(udf, fFSch, tFSch, args.get(i));
- }
- }
-
- /**
- * For Bincond, lhsOp and rhsOp must have the same output type
- * or both sides have to be number
- */
- @Override
- protected void visit(LOBinCond binCond) throws VisitorException {
-
- // high-level type checking
- if (binCond.getCond().getType() != DataType.BOOLEAN) {
- int errCode = 1047;
- String msg = "Condition in BinCond must be boolean" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- byte lhsType = binCond.getLhsOp().getType() ;
- byte rhsType = binCond.getRhsOp().getType() ;
-
- // If both sides are number, we can convert the smaller type to the bigger type
- if (DataType.isNumberType(lhsType) && DataType.isNumberType(rhsType)) {
- byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
- if (biggerType > lhsType) {
- insertLeftCastForBinCond(binCond, biggerType, null) ;
- }
- else if (biggerType > rhsType) {
- insertRightCastForBinCond(binCond, biggerType, null) ;
- }
- binCond.setType(biggerType) ;
- }
- else if ((lhsType == DataType.BYTEARRAY)
- && ((rhsType == DataType.CHARARRAY) || (DataType
- .isNumberType(rhsType)))) {
- // Cast byte array to the type on rhs
- insertLeftCastForBinCond(binCond, rhsType, null);
- binCond.setType(DataType.mergeType(lhsType, rhsType));
- } else if ((rhsType == DataType.BYTEARRAY)
- && ((lhsType == DataType.CHARARRAY) || (DataType
- .isNumberType(lhsType)))) {
- // Cast byte array to the type on lhs
- insertRightCastForBinCond(binCond, lhsType, null);
- binCond.setType(DataType.mergeType(lhsType, rhsType));
- }
- // A constant null is always bytearray - so cast it
- // to rhs type
- else if (binCond.getLhsOp() instanceof LOConst
- && ((LOConst) binCond.getLhsOp()).getValue() == null) {
- try {
- insertLeftCastForBinCond(binCond, rhsType, binCond.getRhsOp().getFieldSchema());
- } catch (FrontendException e) {
- int errCode = 2216;
- String msg = "Problem getting fieldSchema for " +binCond.getRhsOp();
- throw new TypeCheckerException(msg, errCode, PigException.BUG, e);
- }
- } else if (binCond.getRhsOp() instanceof LOConst
- && ((LOConst) binCond.getRhsOp()).getValue() == null) {
- try {
- insertRightCastForBinCond(binCond, lhsType, binCond.getLhsOp().getFieldSchema());
- } catch (FrontendException e) {
- int errCode = 2216;
- String msg = "Problem getting fieldSchema for " +binCond.getRhsOp();
- throw new TypeCheckerException(msg, errCode, PigException.BUG, e);
- }
- } else if (lhsType == rhsType) {
- // Matching schemas if we're working with tuples
- if (DataType.isSchemaType(lhsType)) {
- try {
- if (!Schema.FieldSchema.equals(binCond.getLhsOp().getFieldSchema(), binCond.getRhsOp().getFieldSchema(), false, true)) {
- int errCode = 1048;
- String msg = "Two inputs of BinCond must have compatible schemas."
- + " left hand side: " + binCond.getLhsOp().getFieldSchema()
- + " right hand side: " + binCond.getRhsOp().getFieldSchema();
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- // TODO: We may have to merge the schema here
- // if the previous check is not exact match
- // Is Schema.reconcile good enough?
- }
- catch (FrontendException fe) {
- int errCode = 1049;
- String msg = "Problem during evaluaton of BinCond output type" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- binCond.setType(DataType.TUPLE) ;
- }
-
- binCond.setType(lhsType);
- }
- else {
- int errCode = 1050;
- String msg = "Unsupported input type for BinCond: left hand side: " + DataType.findTypeName(lhsType) + "; right hand side: " + DataType.findTypeName(rhsType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- try {
- binCond.regenerateFieldSchema();
- } catch (FrontendException fee) {
- int errCode = 1040;
- String msg = "Could not set BinCond field schema";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
- }
-
- }
-
- private void insertLeftCastForBinCond(LOBinCond binCond, byte toType, FieldSchema toFs) throws VisitorException {
- insertCast(binCond, toType, toFs, binCond.getLhsOp());
- }
-
- private void insertRightCastForBinCond(LOBinCond binCond, byte toType, FieldSchema toFs) throws VisitorException {
- insertCast(binCond, toType, toFs, binCond.getRhsOp());
- }
-
- /**
- * For Basic Types:
- * 0) Casting to itself is always ok
- * 1) Casting from number to number is always ok
- * 2) ByteArray to anything is ok
- * 3) (number or chararray) to (bytearray or chararray) is ok
- * For Composite Types:
- * Recursively traverse the schemas till you get a basic type
- */
- @Override
- protected void visit(LOCast cast) throws VisitorException {
-
- byte inputType = cast.getExpression().getType();
- byte expectedType = cast.getType();
-
-
- if(expectedType == DataType.BYTEARRAY) {
- int errCode = 1051;
- String msg = "Cannot cast to bytearray";
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- Schema.FieldSchema castFs;
- Schema.FieldSchema inputFs;
- try {
- castFs = cast.getFieldSchema();
- inputFs = cast.getExpression().getFieldSchema();
- } catch(FrontendException fee) {
- int errCode = 1076;
- String msg = "Problem while reading field schema of cast operator.";
- throw new TypeCheckerException(msg, errCode, PigException.BUG, fee);
- }
- boolean castable = Schema.FieldSchema.castable(castFs, inputFs);
- if(!castable) {
- int errCode = 1052;
- String msg = "Cannot cast "
- + DataType.findTypeName(inputType)
- + ((DataType.isSchemaType(inputType))? " with schema " + inputFs : "")
- + " to "
- + DataType.findTypeName(expectedType)
- + ((DataType.isSchemaType(expectedType))? " with schema " + castFs : "");
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- // cast.getType() already returns the correct type so don't have to
- // set here. This is a special case where output type is not
- // automatically determined.
-
- if(inputType == DataType.BYTEARRAY ||
- ( // a hack . need to add a caster for LOUserFunc if its for
- // scalar alias, as the dependency on predecessor LO is not
- // managed correctly, and might result in result type getting
- // set as bytearray later on
- cast.getExpression() instanceof LOUserFunc &&
- ((LOUserFunc)cast.getExpression()).getImplicitReferencedOperator() != null
- )
- ) {
- try {
- Map<String, LogicalOperator> canonicalMap = cast.getFieldSchema().getCanonicalMap();
- // two variables to ensure that only one load func is mapped to
- // the cast operator
- FuncSpec prevLoadFuncSpec = null;
- boolean prevLoadFuncSet = false;
- for( Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) {
- FuncSpec loadFuncSpec = getLoadFuncSpec( entry.getValue(), entry.getKey() );
- if(!prevLoadFuncSet){
- prevLoadFuncSet = true;
- prevLoadFuncSpec = loadFuncSpec;
- }
- if(loadFuncSpec == null ){
- if(prevLoadFuncSpec != null){
- //BUG
- String msg = "Bug: A null and a non-null load function " +
- " mapped to cast through lineage";
- throw new VisitorException(msg, 2258, PigException.BUG);
- }else{
- continue;
- }
- }
- if(!loadFuncSpec.equals(prevLoadFuncSpec)){
- //BUG
- String msg = "Bug:Two different load functions mapped to " +
- "an LOCast op";
- throw new VisitorException(msg, 2258, PigException.BUG);
- }
-
- cast.setLoadFuncSpec( loadFuncSpec );
- }
- } catch (FrontendException fee) {
- int errCode = 1053;
- String msg = "Cannot resolve load function to use for casting from " +
- DataType.findTypeName(inputType) + " to " +
- DataType.findTypeName(expectedType) + ". ";
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee);
- }
- }
- }
-
-
- /***********************************************************************/
- /* Relational Operators */
- /***********************************************************************/
- /*
- All the getType() of these operators always return BAG.
- We just have to :-
- 1) Check types of inputs, inner plans
- 2) Compute output schema with type information
- (At the moment, the parser does only return GetSchema with correct aliases)
- 3) Insert casting if necessary
-
- */
-
- /*
- The output schema of LOUnion is the merge of all input schemas.
- Operands on left side always take precedance on aliases.
-
- We allow type promotion here
- */
-
- @Override
- protected void visit(LOUnion u) throws VisitorException {
- u.unsetSchema();
- // Have to make a copy, because as we insert operators, this list will
- // change under us.
- List<LogicalOperator> inputs =
- new ArrayList<LogicalOperator>(u.getInputs());
-
- // There is no point to union only one operand
- // it should be a problem in the parser
- if (inputs.size() < 2) {
- throw new AssertionError("Union with Count(Operand) < 2") ;
- }
-
- Schema schema = null ;
- try {
-
- if (strictMode) {
- // Keep merging one by one just to check if there is
- // any problem with types in strict mode
- Schema tmpSchema = inputs.get(0).getSchema() ;
- for (int i=1; i< inputs.size() ;i++) {
- // Assume the first input's aliases take precedance
- tmpSchema = tmpSchema.merge(inputs.get(i).getSchema(), false) ;
-
- // if they cannot be merged, we just give up
- if (tmpSchema == null) {
- int errCode = 1054;
- String msg = "Cannot merge schemas from inputs of UNION" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
- }
- }
-
- // Compute the schema
- schema = u.getSchema() ;
-
- }
- catch (FrontendException fee) {
- int errCode = 1055;
- String msg = "Problem while reading schemas from inputs of Union" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
- }
-
- // Do cast insertion only if we are typed
- // and if its not union-onschema. In case of union-onschema the
- // foreach with cast is added in UnionOnSchemaSetter
- if (schema != null && !u.isOnSchema()) {
- // Insert casting to inputs if necessary
- for (int i=0; i< inputs.size() ;i++) {
- LOForEach insertedOp
- = insertCastForEachInBetweenIfNecessary(inputs.get(i), u, schema) ;
-
- // We may have to compute the schema of the input again
- // because we have just inserted
- if (insertedOp != null) {
- if(insertedOp.getAlias()==null){
- insertedOp.setAlias(inputs.get(i).getAlias());
- }
- try {
- this.visit(insertedOp);
- }
- catch (FrontendException fee) {
- int errCode = 1056;
- String msg = "Problem while casting inputs of Union" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
- }
- }
- }
- }
- }
-
- @Override
- protected void visit(LOSplitOutput op) throws VisitorException {
- op.unsetSchema();
- LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
-
- // LOSplitOutput can only have 1 input
- List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
- if (list.size() != 1) {
- int errCode = 2008;
- String msg = "LOSplitOutput cannot have more than one input. Found: " + list.size() + " input(s).";
- throw new TypeCheckerException(msg, errCode, PigException.BUG) ;
- }
-
- LogicalPlan condPlan = op.getConditionPlan() ;
-
- // Check that the inner plan has only 1 output port
- if (!condPlan.isSingleLeafPlan()) {
- int errCode = 1057;
- String msg = "Split's inner plan can only have one output (leaf)" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- checkInnerPlan(op.getAlias(), condPlan) ;
-
- byte innerCondType = condPlan.getLeaves().get(0).getType() ;
- if (innerCondType != DataType.BOOLEAN) {
- int errCode = 1058;
- String msg = "Split's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType) ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- try {
- // Compute the schema
- op.getSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1055;
- String msg = "Problem while reading"
- + " schemas from inputs of SplitOutput" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
-
- /***
- * LODistinct, output schema should be the same as input
- * @param op
- * @throws VisitorException
- */
-
- @Override
- protected void visit(LODistinct op) throws VisitorException {
- op.unsetSchema();
-
- try {
- // Compute the schema
- op.getSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1055;
- String msg = "Problem while reading"
- + " schemas from inputs of Distinct" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- @Override
- protected void visit(LOLimit op) throws VisitorException {
- try {
- // Compute the schema
- op.regenerateSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1055;
- String msg = "Problem while reading"
- + " schemas from inputs of Limit" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- /***
- * Return concatenated of all fields from all input operators
- * If one of the inputs have no schema then we cannot construct
- * the output schema.
- * @param cs
- * @throws VisitorException
- */
- protected void visit(LOCross cs) throws VisitorException {
- cs.unsetSchema();
-
- try {
- // Compute the schema
- cs.getSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1055;
- String msg = "Problem while reading"
- + " schemas from inputs of Cross" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- /***
- * The schema of sort output will be the same as sort input.
- *
- */
-
- protected void visit(LOSort s) throws VisitorException {
- s.unsetSchema();
- LogicalOperator input = s.getInput() ;
-
- // Type checking internal plans.
- for(int i=0;i < s.getSortColPlans().size(); i++) {
-
- LogicalPlan sortColPlan = s.getSortColPlans().get(i) ;
-
- // Check that the inner plan has only 1 output port
- if (!sortColPlan.isSingleLeafPlan()) {
- int errCode = 1057;
- String msg = "Sort's inner plan can only have one output (leaf)" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- checkInnerPlan(s.getAlias(), sortColPlan) ;
- // TODO: May have to check SortFunc compatibility here in the future
-
- }
-
- s.setType(input.getType()) ; // This should be bag always.
-
- try {
- // Compute the schema
- s.getSchema() ;
- }
- catch (FrontendException fee) {
- int errCode = 1059;
- String msg = "Problem while reconciling output schema of Sort" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
- }
- }
-
-
- /***
- * The schema of filter output will be the same as filter input
- */
-
- @Override
- protected void visit(LOFilter filter) throws VisitorException {
- filter.unsetSchema();
- LogicalPlan comparisonPlan = filter.getComparisonPlan() ;
-
- // Check that the inner plan has only 1 output port
- if (!comparisonPlan.isSingleLeafPlan()) {
- int errCode = 1057;
- String msg = "Filter's cond plan can only have one output (leaf)" ;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
- checkInnerPlan(filter.getAlias(), comparisonPlan) ;
-
- byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
- if (innerCondType != DataType.BOOLEAN) {
- int errCode = 1058;
- String msg = "Filter's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType);
- msgCollector.collect(msg, MessageType.Error) ;
- throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
- }
-
-
- try {
- // Compute the schema
- filter.getSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1059;
- String msg = "Problem while reconciling output schema of Filter" ;
- msgCollector.collect(msg, MessageType.Error);
- throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
- }
- }
-
- /***
- * The schema of split output will be the same as split input
- */
-
- protected void visit(LOSplit split) throws VisitorException {
- // TODO: Why doesn't LOSplit have getInput() ???
- List<LogicalOperator> inputList = mPlan.getPredecessors(split) ;
-
- if (inputList.size() != 1) {
- int errCode = 2008;
- String msg = "LOSplit cannot have more than one input. Found: " + inputList.size() + " input(s).";
- throw new TypeCheckerException(msg, errCode, PigException.BUG) ;
- }
-
- try {
- // Compute the schema
- split.regenerateSchema() ;
- }
- catch (FrontendException fe) {
- int errCode = 1059;
[... 1062 lines stripped ...]