You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/19 00:45:05 UTC
svn commit: r1024052 - in /pig/branches/branch-0.8: ./ src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/
Author: thejas
Date: Mon Oct 18 22:45:04 2010
New Revision: 1024052
URL: http://svn.apache.org/viewvc?rev=1024052&view=rev
Log:
PIG-1673: query with consecutive union-onschema statement errors out
Added:
pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java
pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Mon Oct 18 22:45:04 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1673: query with consecutive union-onschema statement errors out (thejas)
+
PIG-1653: Scripting UDF fails if the path to script is an absolute path (daijy)
PIG-1669: PushUpFilter fail when filter condition contains scalar (daijy)
Modified: pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/PigServer.java Mon Oct 18 22:45:04 2010
@@ -81,6 +81,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.PlanSetter;
import org.apache.pig.impl.logicalLayer.ScalarFinder;
+import org.apache.pig.impl.logicalLayer.UnionOnSchemaSetter;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
@@ -1312,6 +1313,9 @@ public class PigServer {
PlanSetter ps = new PlanSetter(lp);
ps.visit();
+ UnionOnSchemaSetter setUnionOnSchema = new UnionOnSchemaSetter(lp, pigContext);
+ setUnionOnSchema.visit();
+
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
boolean isBeforeOptimizer = true;
Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java Mon Oct 18 22:45:04 2010
@@ -18,29 +18,27 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
import java.util.Collection;
import java.util.Iterator;
-import java.util.Set;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.RequiredFields;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class LOUnion extends RelationalOperator {
private static final long serialVersionUID = 2L;
private static Log log = LogFactory.getLog(LOUnion.class);
+ private boolean isOnSchema = false;
/**
* @param plan
@@ -294,4 +292,18 @@ public class LOUnion extends RelationalO
super.pruneColumns(columns);
return true;
}
+
+ /**
+ * @param isOnSchema the isOnSchema to set
+ */
+ public void setOnSchema(boolean isOnSchema) {
+ this.isOnSchema = isOnSchema;
+ }
+
+ /**
+ * @return the isOnSchema
+ */
+ public boolean isOnSchema() {
+ return isOnSchema;
+ }
}
Added: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java?rev=1024052&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java Mon Oct 18 22:45:04 2010
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.impl.logicalLayer.validators.UnionOnSchemaSetException;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanValidationException;
+
+/**
+ * A visitor that modifies the logical plan (if necessary) for union-onschema
+ * functionality. It runs logical plan validator so that the correct schema
+ * of its inputs is available. It inserts foreach statements in its input
+ * if the input operator schema does not match the schema created by
+ * merging all input schemas
+ *
+ */
+public class UnionOnSchemaSetter extends LOVisitor {
+
+ private PigContext pigContext;
+
+
+ public UnionOnSchemaSetter(LogicalPlan plan, PigContext pigContext) {
+ super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+ this.pigContext = pigContext;
+ }
+
+
+ public void visit(LOUnion loUnion) throws PlanValidationException, UnionOnSchemaSetException {
+ if(!loUnion.isOnSchema()) {
+ //Not union-onschema, nothing to be done
+ return;
+ }
+ // run through validator first on inputs so that the schemas have the right
+ //types for columns. It will run TypeCheckingValidator as well.
+ // The compilation messages will be logged when validation is
+ // done from PigServer, so not doing it here
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ boolean isBeforeOptimizer = true;
+
+
+ LogicalPlanValidationExecutor validator =
+ new LogicalPlanValidationExecutor(mPlan, pigContext, isBeforeOptimizer);
+ validator.validate(mPlan, collector);
+ List<LogicalOperator> preds = mPlan.getPredecessors(loUnion);
+
+ //validate each input schema, and collect them in the ArrayList
+ ArrayList<Schema> schemas = new ArrayList<Schema>(preds.size());
+ for(LogicalOperator lop : preds){
+ Schema sch;
+ try {
+ sch = lop.getSchema();
+ } catch (FrontendException e) {
+ throw new UnionOnSchemaSetException("Error getting schema from logical operator");
+ }
+ if(sch == null)
+ {
+ String msg = "Schema of relation " + lop.getAlias()
+ + " is null."
+ + " UNION ONSCHEMA cannot be used with relations that"
+ + " have null schema.";
+ throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
+ }
+ for(Schema.FieldSchema fs : sch.getFields()){
+ if(fs.alias == null){
+ String msg = "Schema of relation " + lop.getAlias()
+ + " has a null fieldschema for column(s). Schema :" +
+ sch;
+ throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
+ }
+ }
+ schemas.add(sch);
+ }
+
+ //create the merged schema
+ Schema mergedSchema ;
+ try {
+ mergedSchema = Schema.mergeSchemasByAlias(schemas);
+ }catch(SchemaMergeException e) {
+ String msg = "Error merging schemas for union operator : "
+ + e.getMessage();
+ throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT, e);
+ }
+
+
+ //create a user defined schema list for use in LOForeach
+ // using merged schema
+ ArrayList<Schema> mergedSchemaList = new ArrayList<Schema>();
+ for(Schema.FieldSchema fs : mergedSchema.getFields()){
+ // Use NULL datatype because the type will be set by the TypeChecking
+ // visitors
+ mergedSchemaList.add(
+ new Schema(new Schema.FieldSchema(fs.alias, DataType.NULL))
+ );
+ }
+
+ // add a foreach for inputs that don't match mergedSchema, projecting
+ // null for columns that don't exist in the input
+ for(LogicalOperator lop : preds)
+ {
+ try {
+ if(! lop.getSchema().equals(mergedSchema))
+ {
+ //the mergedSchema is different from this operators schema
+ // so add a foreach to project columns appropriately
+ int mergeSchSz = mergedSchema.size();
+ ArrayList<LogicalPlan> generatePlans =
+ new ArrayList<LogicalPlan>(mergeSchSz);
+ ArrayList<Boolean> flattenList =
+ new ArrayList<Boolean>(mergeSchSz);
+
+ String scope = loUnion.getOperatorKey().getScope();
+ for(Schema.FieldSchema fs : mergedSchema.getFields()) {
+ LogicalPlan projectPlan = new LogicalPlan();
+ Schema inpSchema = lop.getSchema();
+ flattenList.add(Boolean.FALSE);
+
+ int inpPos = inpSchema.getPositionSubName(fs.alias);
+
+ LogicalOperator columnProj = null;
+ boolean isCastNeeded = false;
+ if(inpPos == -1){
+ //the column is not present in schema of this input,
+ // so project null
+ columnProj =
+ new LOConst(mPlan, getNextId(scope), null);
+ // cast is necessary if the type in schema is
+ // not a BYTEARRAY
+ if(fs.type != DataType.BYTEARRAY){
+ isCastNeeded = true;
+ }
+ }else {
+ //project the column from input
+ columnProj =
+ new LOProject(projectPlan,
+ new OperatorKey(
+ scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)
+ ),
+ lop, inpPos
+ );
+
+ //cast is needed if types are different.
+ //compatibility of types has already been checked
+ //during creation of mergedSchema
+ Schema.FieldSchema inpFs = inpSchema.getFieldSubNameMatch(fs.alias);
+ if(inpFs.type != fs.type)
+ isCastNeeded = true;
+ }
+ projectPlan.add(columnProj);
+
+ //add a LOCast if necessary
+ if(isCastNeeded){
+ LOCast loCast = new LOCast(
+ projectPlan,
+ getNextId(scope),
+ fs.type
+ );
+ loCast.setFieldSchema(fs);
+ projectPlan.add(loCast);
+ projectPlan.connect(columnProj, loCast);
+ }
+ generatePlans.add(projectPlan);
+
+ }
+ LogicalOperator foreach = new LOForEach(
+ mPlan,
+ getNextId(scope),
+ generatePlans, flattenList,
+ mergedSchemaList
+ );
+ mPlan.add(foreach);
+ mPlan.insertBetween(lop, foreach, loUnion);
+ }
+ }
+ catch (FrontendException e) {
+ String msg = "Error adding union operator " + loUnion.getAlias()
+ + ":" + e.getMessage();
+ UnionOnSchemaSetException pe = new UnionOnSchemaSetException(msg);
+ pe.initCause(e);
+ throw pe;
+ }
+
+ }
+
+ }
+
+
+ private OperatorKey getNextId(String scope) {
+ return new OperatorKey(
+ scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)
+ );
+ }
+
+}
Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Oct 18 22:45:04 2010
@@ -2261,158 +2261,12 @@ LogicalOperator UnionClause(LogicalPlan
// to ParseException. Otherwise, if any exception than ParseException
// is thrown , the generated parse code tries to cast
//the exception to Error, resulting in a misleading error message
-
- if(isOnSchema) {
- // run through validator first on inputs so that the schemas have the right
- //types for columns. It will run TypeCheckingValidator as well.
- // The compilation messages will be logged when validation is
- // done from PigServer, so not doing it here
- CompilationMessageCollector collector = new CompilationMessageCollector() ;
- boolean isBeforeOptimizer = true;
- LogicalPlanValidationExecutor validator =
- new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
- validator.validate(lp, collector);
- }
-
- LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
+ LOUnion union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
+ union.setOnSchema(isOnSchema);
lp.add(union);
- log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
-
- if(isOnSchema)
- { // this is UNION ONSCHEMA, find merged schema
- // and (if necessary) add foreach to align columns
-
-
- ArrayList<Schema> schemas = new ArrayList<Schema>(inputs.size());
- for(LogicalOperator lop : inputs){
- Schema sch = lop.getSchema();
- if(sch == null)
- {
- String msg = "Schema of relation " + lop.getAlias()
- + " is null."
- + " UNION ONSCHEMA cannot be used with relations that"
- + " have null schema.";
- throw new ParseException(msg);
- }
- for(Schema.FieldSchema fs : sch.getFields()){
- if(fs.alias == null){
- String msg = "Schema of relation " + lop.getAlias()
- + " has a null fieldschema for column(s). Schema :" +
- sch;
- throw new ParseException(msg);
- }
- }
- schemas.add(sch);
- }
- Schema mergedSchema ;
- try {
- mergedSchema = Schema.mergeSchemasByAlias(schemas);
- }catch(SchemaMergeException e) {
- String msg = "Error merging schemas for union operator : "
- + e.getMessage();
- ParseException pe = new ParseException(msg);
- pe.initCause(e);
- throw pe;
- }
- // add a foreach for inputs that don't match mergedSchema, projecting
- // null for columns that don't exist in the input
- ArrayList<LogicalOperator> newInputs =
- new ArrayList<LogicalOperator>(inputs.size());
-
- //create a user defined schema list for use in LOForeach
- // using merged schema
- ArrayList<Schema> mergedSchemaList = new ArrayList<Schema>();
- for(Schema.FieldSchema fs : mergedSchema.getFields()){
- mergedSchemaList.add(new Schema(new Schema.FieldSchema(fs.alias, DataType.NULL)));
- }
-
-
- for(LogicalOperator lop : inputs)
- {
- if(! lop.getSchema().equals(mergedSchema))
- {
- //the mergedSchema is different from this operators schema
- // so add a foreach to project columns appropriately
- int mergeSchSz = mergedSchema.size();
- ArrayList<LogicalPlan> generatePlans =
- new ArrayList<LogicalPlan>(mergeSchSz);
- ArrayList<Boolean> flattenList =
- new ArrayList<Boolean>(mergeSchSz);
-
- for(Schema.FieldSchema fs : mergedSchema.getFields()) {
- LogicalPlan projectPlan = new LogicalPlan();
- Schema inpSchema = lop.getSchema();
- flattenList.add(Boolean.FALSE);
-
- int inpPos = inpSchema.getPositionSubName(fs.alias);
-
- LogicalOperator columnProj = null;
- boolean isCastNeeded = false;
- if(inpPos == -1){
- //the column is not present in schema of this input,
- // so project null
- columnProj =
- new LOConst(lp,
- new OperatorKey(scope, getNextId()),
- null
- );
- // cast is necessary if the type in schema is
- // not a BYTEARRAY
- if(fs.type != DataType.BYTEARRAY){
- isCastNeeded = true;
- }
- }else {
- //project the column from input
- columnProj =
- new LOProject(projectPlan,
- new OperatorKey(scope, getNextId()),
- lop, inpPos
- );
-
- //cast is needed if types are different.
- //compatibility of types has already been checked
- //during creation of mergedSchema
- Schema.FieldSchema inpFs = inpSchema.getFieldSubNameMatch(fs.alias);
- if(inpFs.type != fs.type)
- isCastNeeded = true;
- }
- projectPlan.add(columnProj);
-
- //add a LOCast if necessary
- if(isCastNeeded){
- LOCast loCast = new LOCast(projectPlan,
- new OperatorKey(scope, getNextId()),
- fs.type
- );
- loCast.setFieldSchema(fs);
- projectPlan.add(loCast);
- projectPlan.connect(columnProj, loCast);
- }
- generatePlans.add(projectPlan);
-
- }
- LogicalOperator foreach = new LOForEach(lp,
- new OperatorKey(scope, getNextId()),
- generatePlans, flattenList,
- mergedSchemaList
- );
- lp.add(foreach);
- lp.connect(lop, foreach);
- newInputs.add(foreach);
- }else {
- // schema of input is same as mergedSchema,
- //no additional foreach is required
- newInputs.add(lop);
- }
-
- }
- // use newInputs as the inputs for union
- inputs = newInputs;
- }
-
+ log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
-
for (LogicalOperator lop: inputs) {
lp.connect(lop, union);
log.debug("Connected union input operator " +
@@ -2423,10 +2277,6 @@ LogicalOperator UnionClause(LogicalPlan
log.trace("Exiting UnionClause");
return union;
}
- catch(ParseException e){
- // its already a ParseException, just throw it.
- throw e;
- }
catch(Exception e){
ParseException pe = new ParseException();
pe.initCause(e);
Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Oct 18 22:45:04 2010
@@ -1892,8 +1892,10 @@ public class TypeCheckingVisitor extends
throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
}
- // Do cast insertion only if we are typed
- if (schema != null) {
+ // Do cast insertion only if we are typed
+ // and if its not union-onschema. In case of union-onschema the
+ // foreach with cast is added in UnionOnSchemaSetter
+ if (schema != null && !u.isOnSchema()) {
// Insert casting to inputs if necessary
for (int i=0; i< inputs.size() ;i++) {
LOForEach insertedOp
Added: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java?rev=1024052&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java Mon Oct 18 22:45:04 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.validators;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UnionOnSchemaSetException extends VisitorException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a new TypeCheckerException with null as the error message.
+ */
+ public UnionOnSchemaSetException() {
+ super();
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ */
+ public UnionOnSchemaSetException(String message) {
+ super(message);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified cause.
+ *
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public UnionOnSchemaSetException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public UnionOnSchemaSetException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ */
+ public UnionOnSchemaSetException(String message, int errCode) {
+ super(message, errCode);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public UnionOnSchemaSetException(String message, int errCode, Throwable cause) {
+ super(message, errCode, cause);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ */
+ public UnionOnSchemaSetException(String message, int errCode, byte errSrc) {
+ super(message, errCode, errSrc);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+ Throwable cause) {
+ super(message, errCode, errSrc, cause);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param retry - If the exception is retriable or not
+ */
+ public UnionOnSchemaSetException(String message, int errCode, boolean retry) {
+ super(message, errCode, retry);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ */
+ public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+ boolean retry) {
+ super(message, errCode, errSrc, retry);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ * @param detailedMsg - The detailed message shown to the developer
+ */
+ public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+ boolean retry, String detailedMsg) {
+ super(message, errCode, errSrc, retry, detailedMsg);
+ }
+
+ /**
+ * Create a new TypeCheckerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ * @param detailedMsg - The detailed message shown to the developer
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+ boolean retry, String detailedMsg, Throwable cause) {
+ super(message, errCode, errSrc, retry, detailedMsg, cause);
+ }
+
+}
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java Mon Oct 18 22:45:04 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -40,6 +41,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.LogUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -523,9 +525,11 @@ public class TestUnionOnSchema {
boolean foundEx = false;
try{
Util.registerMultiLineQuery(pig, query);
+ pig.dumpSchema("u");
}catch(FrontendException e){
+ PigException pigEx = LogUtils.getPigException(e);
foundEx = true;
- if(!e.getMessage().contains(expectedErr)){
+ if(!pigEx.getMessage().contains(expectedErr)){
String msg = "Expected exception message matching '"
+ expectedErr + "' but got '" + e.getMessage() + "'" ;
fail(msg);
@@ -686,6 +690,60 @@ public class TestUnionOnSchema {
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
+
+
+ /**
+ * Test UNION ONSCHEMA with udf whose default type is different from
+ * final type - where udf is not in immediate input of union
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testUnionOnSchemaUdfTypeEvolution2() throws IOException, ParseException {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ String query_prefix =
+ " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : int, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ + ", t : tuple (tc1 : int, tc2 : chararray) );"
+ + " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : int, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ + ", t : tuple (tc1 : int, tc2 : chararray) );"
+ + "f1 = foreach l1 generate i, MAX(b.c1) as mx;"
+ + "f11 = foreach f1 generate i, mx;"
+ + "f2 = foreach l2 generate i, COUNT(b.c1) as mx;"
+ + "f22 = foreach f2 generate i, mx;"
+
+ ;
+ String query = query_prefix + "u = union onschema f11, f22;";
+ Util.registerMultiLineQuery(pig, query);
+ Schema sch = pig.dumpSchema("u");
+ Schema expectedSch =
+ Util.getSchemaFromString("i: int, mx: long");
+ assertEquals("Checking expected schema",sch, expectedSch);
+
+ // verify schema for reverse order of relations as well
+ query = query_prefix + "u = union onschema f22, f11;";
+ Util.registerMultiLineQuery(pig, query);
+ sch = pig.dumpSchema("u");
+ expectedSch =
+ Util.getSchemaFromString("i: int, mx: long");
+ assertEquals("Checking expected schema",sch, expectedSch);
+
+
+ Iterator<Tuple> it = pig.openIterator("u");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1,1L)",
+ "(5,2L)",
+ "(1,2L)",
+ "(5,2L)"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+ }
/**
* Udf that has schema of tuple column with no inner schema
@@ -753,5 +811,35 @@ public class TestUnionOnSchema {
}
-
+ /**
+ * Test query with a union-onschema having another as input
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testTwoUnions() throws IOException, ParseException {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ String query =
+ " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);"
+ + "u1 = union onschema l1, l2;"
+ + "l3 = load '" + INP_FILE_2NUMS + "' as (i : long, j : double);"
+ + "u2 = union onschema u1, l3;"
+ ;
+ Util.registerMultiLineQuery(pig, query);
+ Iterator<Tuple> it = pig.openIterator("u2");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1L,2.0)",
+ "(5L,3.0)",
+ "(1L,2.0)",
+ "(5L,3.0)",
+ "(1L,2.0)",
+ "(5L,3.0)"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
}