You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/01/25 23:04:20 UTC
svn commit: r1063479 - in /pig/trunk:
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/newplan/logical/relational/
src/org/apache/pig/newplan/logical/visitor/ test/org/apache/pig/parser/
Author: thejas
Date: Tue Jan 25 22:04:19 2011
New Revision: 1063479
URL: http://svn.apache.org/viewvc?rev=1063479&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-13.2.patch - (xuefuz via thejas)
Added:
pig/trunk/src/org/apache/pig/newplan/logical/visitor/
pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jan 25 22:04:19 2011
@@ -79,6 +79,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.pen.POOptimizeDisabler;
@@ -366,56 +367,6 @@ public class HExecutionEngine {
return newPreoptimizedPlan;
}
- public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
-
- public SortInfoSetter(OperatorPlan plan) throws FrontendException {
- super(plan, new DependencyOrderWalker(plan));
- }
-
- @Override
- public void visit(LOStore store) throws FrontendException {
-
- Operator storePred = store.getPlan().getPredecessors(store).get(0);
- if(storePred == null){
- int errCode = 2051;
- String msg = "Did not find a predecessor for Store." ;
- throw new FrontendException(msg, errCode, PigException.BUG);
- }
-
- SortInfo sortInfo = null;
- if(storePred instanceof LOLimit) {
- storePred = store.getPlan().getPredecessors(storePred).get(0);
- } else if (storePred instanceof LOSplitOutput) {
- LOSplitOutput splitOutput = (LOSplitOutput)storePred;
- // We assume this is the LOSplitOutput we injected for this case:
- // b = order a by $0; store b into '1'; store b into '2';
- // In this case, we should mark both '1' and '2' as sorted
- LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
- if (conditionPlan.getSinks().size()==1) {
- Operator root = conditionPlan.getSinks().get(0);
- if (root instanceof ConstantExpression) {
- Object value = ((ConstantExpression)root).getValue();
- if (value instanceof Boolean && (Boolean)value==true) {
- Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
- if (split instanceof LOSplit)
- storePred = store.getPlan().getPredecessors(split).get(0);
- }
- }
- }
- }
- // if this predecessor is a sort, get
- // the sort info.
- if(storePred instanceof LOSort) {
- try {
- sortInfo = ((LOSort)storePred).getSortInfo();
- } catch (FrontendException e) {
- throw new FrontendException(e);
- }
- }
- store.setSortInfo(sortInfo);
- }
- }
-
public List<ExecJob> execute(PhysicalPlan plan,
String jobName) throws ExecException, FrontendException {
MapReduceLauncher launcher = new MapReduceLauncher();
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Tue Jan 25 22:04:19 2011
@@ -23,12 +23,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
public class LOUnion extends LogicalRelationalOperator {
private boolean onSchema;
@@ -54,32 +57,47 @@ public class LOUnion extends LogicalRela
if (schema != null) {
return schema;
}
- List<Operator> inputs = null;
- inputs = plan.getPredecessors(this);
+ List<Operator> inputs = plan.getPredecessors(this);
// If any predecessor's schema is null, then the schema for union is null
for (Operator input : inputs) {
LogicalRelationalOperator op = (LogicalRelationalOperator)input;
- if (op.getSchema()==null)
- return null;
+ if( op.getSchema() == null ) {
+ if( isOnSchema() ) {
+ String msg = "Schema of relation " + op.getAlias()
+ + " is null."
+ + " UNION ONSCHEMA cannot be used with relations that"
+ + " have null schema.";
+ throw new FrontendException(msg, 1116, PigException.INPUT);
+
+ } else {
+ return null;
+ }
+ }
}
+ LogicalSchema mergedSchema = null;
LogicalSchema s0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
- if (inputs.size()==1)
- return s0;
- LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
- LogicalSchema mergedSchema = LogicalSchema.merge(s0, s1);
- if (mergedSchema==null)
- return null;
-
- // Merge schema
- for (int i=2;i<inputs.size();i++) {
- LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
- if (mergedSchema==null || otherSchema==null)
- return null;
- mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
- if (mergedSchema == null)
+ if ( inputs.size() == 1 )
+ return schema = s0;
+
+ if( isOnSchema() ) {
+ mergedSchema = createMergedSchemaOnAlias( inputs );
+ } else {
+ LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+ mergedSchema = LogicalSchema.merge(s0, s1);
+ if (mergedSchema==null)
return null;
+
+ // Merge schema
+ for (int i=2;i<inputs.size();i++) {
+ LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+ if (mergedSchema==null || otherSchema==null)
+ return null;
+ mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+ if (mergedSchema == null)
+ return null;
+ }
}
// Bring back cached uid if any; otherwise, cache uid generated
@@ -103,10 +121,41 @@ public class LOUnion extends LogicalRela
fs.uid = uid;
}
- schema = mergedSchema;
- return schema;
+ return schema = mergedSchema;
}
+ /**
+ * create schema for union-onschema
+ */
+ private LogicalSchema createMergedSchemaOnAlias(List<Operator> ops)
+ throws FrontendException {
+ ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+ for( Operator op : ops ){
+ LogicalRelationalOperator lop = (LogicalRelationalOperator)op;
+ LogicalSchema sch = lop.getSchema();
+ for( LogicalFieldSchema fs : sch.getFields() ) {
+ if(fs.alias == null){
+ String msg = "Schema of relation " + lop.getAlias()
+ + " has a null fieldschema for column(s). Schema :" + sch;
+ throw new FrontendException( msg, 1116, PigException.INPUT );
+ }
+ }
+ schemas.add( sch );
+ }
+
+ //create the merged schema
+ LogicalSchema mergedSchema = null;
+ try {
+ mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
+ } catch(SchemaMergeException e) {
+ String msg = "Error merging schemas for union operator : "
+ + e.getMessage();
+ throw new FrontendException(msg, 1116, PigException.INPUT, e);
+ }
+
+ return mergedSchema;
+ }
+
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Tue Jan 25 22:04:19 2011
@@ -20,12 +20,15 @@ package org.apache.pig.newplan.logical.r
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
@@ -168,6 +171,52 @@ public class LogicalSchema {
type, uid);
return newFs;
}
+
+ /***
+ * Compare two field schema for equality
+
+ * @param relaxInner If true, we don't check inner tuple schemas
+ * @param relaxAlias If true, we don't check aliases
+ * @return true if FieldSchemas are equal, false otherwise
+ */
+ public static boolean equals(LogicalFieldSchema fschema,
+ LogicalFieldSchema fother,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ if( fschema == null || fother == null ) {
+ return false ;
+ }
+
+ if( fschema.type != fother.type ) {
+ return false ;
+ }
+
+
+ if (!relaxAlias) {
+ if ( fschema.alias == null && fother.alias == null ) {
+ // good
+ } else if ( fschema.alias == null ) {
+ return false ;
+ } else if( !fschema.alias.equals( fother.alias ) ) {
+ return false ;
+ }
+ }
+
+ if ( (!relaxInner) && (DataType.isSchemaType(fschema.type))) {
+ // Don't do the comparison if both embedded schemas are
+ // null. That will cause Schema.equals to return false,
+ // even though we want to view that as true.
+ if (!(fschema.schema == null && fother.schema == null)) {
+ // compare recursively using schema
+ if (!LogicalSchema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+ return false ;
+ }
+ }
+ }
+
+ return true ;
+ }
+
}
private List<LogicalFieldSchema> fields;
@@ -228,6 +277,15 @@ public class LogicalSchema {
return fields.get(p.first);
}
+
+ public int getFieldPosition(String alias) {
+ Pair<Integer, Boolean> p = aliases.get( alias );
+ if( p == null ) {
+ return -1;
+ }
+
+ return p.first;
+ }
/**
* Fetch a field by field number
@@ -397,4 +455,227 @@ public class LogicalSchema {
newSchema.addField(getField(i).deepCopy());
return newSchema;
}
+
+ /**
+ * Merges collection of schemas using their column aliases
+ * (unlike mergeSchema(..) functions which merge using positions)
+ * Schema will not be merged if types are incompatible,
+ * as per DataType.mergeType(..)
+ * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+ * @param schemas - list of schemas to be merged using their column alias
+ * @return merged schema
+ */
+ public static LogicalSchema mergeSchemasByAlias(List<LogicalSchema> schemas)
+ throws SchemaMergeException{
+ LogicalSchema mergedSchema = null;
+
+ // list of schemas that have currently been merged, used in error message
+ ArrayList<LogicalSchema> mergedSchemas = new ArrayList<LogicalSchema>(schemas.size());
+ for(LogicalSchema sch : schemas){
+ if(mergedSchema == null){
+ mergedSchema = sch.deepCopy();
+ mergedSchemas.add(sch);
+ continue;
+ }
+ try{
+ mergedSchema = mergeSchemaByAlias( mergedSchema, sch );
+ mergedSchemas.add(sch);
+ }catch(SchemaMergeException e){
+ String msg = "Error merging schema: (" + sch + ") with "
+ + "merged schema: (" + mergedSchema + ")" + " of schemas : "
+ + mergedSchemas;
+ throw new SchemaMergeException(msg, e);
+ }
+ }
+ return mergedSchema;
+ }
+
+ /**
+ * Merges two schemas using their column aliases
+ * (unlike mergeSchema(..) functions which merge using positions)
+ * Schema will not be merged if types are incompatible,
+ * as per DataType.mergeType(..)
+ * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+ */
+ public static LogicalSchema mergeSchemaByAlias(LogicalSchema schema1, LogicalSchema schema2)
+ throws SchemaMergeException{
+ LogicalSchema mergedSchema = new LogicalSchema();
+ HashSet<LogicalFieldSchema> schema2colsAdded = new HashSet<LogicalFieldSchema>();
+ // add/merge fields present in first schema
+ for(LogicalFieldSchema fs1 : schema1.getFields()){
+ checkNullAlias(fs1, schema1);
+ LogicalFieldSchema fs2 = schema2.getField( fs1.alias );
+ if(fs2 != null){
+ if(schema2colsAdded.contains(fs2)){
+ // alias corresponds to multiple fields in schema1,
+ // just do a lookup on
+ // schema1 , that will throw the appropriate error.
+ schema1.getField( fs2.alias );
+ }
+ schema2colsAdded.add(fs2);
+ }
+ LogicalFieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
+ mergedSchema.addField( mergedFs );
+ }
+
+ //add schemas from 2nd schema, that are not already present in
+ // merged schema
+ for(LogicalFieldSchema fs2 : schema2.getFields()){
+ checkNullAlias(fs2, schema2);
+ if(! schema2colsAdded.contains(fs2)){
+ mergedSchema.addField( new LogicalFieldSchema( fs2 ) );
+ }
+ }
+ return mergedSchema;
+ }
+
+ private static void checkNullAlias(LogicalFieldSchema fs, LogicalSchema schema)
+ throws SchemaMergeException {
+ if(fs.alias == null){
+ throw new SchemaMergeException(
+ "Schema having field with null alias cannot be merged " +
+ "using alias. Schema :" + schema
+ );
+ }
+ }
+
+ /**
+ * Schema will not be merged if types are incompatible,
+ * as per DataType.mergeType(..)
+ * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+ * Aliases are assumed to be same for both
+ */
+ private static LogicalFieldSchema mergeFieldSchemaFirstLevelSameAlias(LogicalFieldSchema fs1,
+ LogicalFieldSchema fs2)
+ throws SchemaMergeException {
+ if(fs1 == null)
+ return fs2;
+ if(fs2 == null)
+ return fs1;
+
+ LogicalSchema innerSchema = null;
+
+ String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
+
+ byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
+
+ // If the types cannot be merged
+ if (mergedType == DataType.ERROR) {
+ int errCode = 1031;
+ String msg = "Incompatible types for merging schemas. Field schema: "
+ + fs1 + " Other field schema: " + fs2;
+ throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
+ }
+ if(DataType.isSchemaType(mergedType)) {
+ // if one of them is a bytearray, pick inner schema of other one
+ if( fs1.type == DataType.BYTEARRAY ){
+ innerSchema = fs2.schema;
+ }else if(fs2.type == DataType.BYTEARRAY){
+ innerSchema = fs1.schema;
+ }
+ else {
+ //in case of types with inner schema such as bags and tuples
+ // the inner schema has to be same
+ if(!equals(fs1.schema, fs2.schema, false, false)){
+ int errCode = 1032;
+ String msg = "Incompatible types for merging inner schemas of " +
+ " Field schema type: " + fs1 + " Other field schema type: " + fs2;
+ throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
+ }
+ innerSchema = fs1.schema;
+ }
+ }
+
+ return new LogicalFieldSchema(alias, innerSchema, mergedType) ;
+ }
+
+ /**
+ * If one of the aliases is of form 'nm::str1', and other is of the form
+ * 'str1', this returns str1
+ */
+ private static String mergeNameSpacedAlias(String alias1, String alias2)
+ throws SchemaMergeException {
+ if(alias1.equals(alias2)){
+ return alias1;
+ }
+ if(alias1.endsWith("::" + alias2)){
+ return alias2;
+ }
+ if(alias2.endsWith("::" + alias1)){
+ return alias1;
+ }
+ //the aliases are different, alias cannot be merged
+ return null;
+ }
+
+ /**
+ * Recursively compare two schemas for equality
+ * @param schema
+ * @param other
+ * @param relaxInner if true, inner schemas will not be checked
+ * @param relaxAlias if true, aliases will not be checked
+ * @return true if schemas are equal, false otherwise
+ */
+ public static boolean equals(LogicalSchema schema,
+ LogicalSchema other,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ // If both of them are null, they are equal
+ if ((schema == null) && (other == null)) {
+ return true ;
+ }
+
+ // otherwise
+ if (schema == null || other == null ) {
+ return false ;
+ }
+
+ /*
+ * Need to check for bags with schemas and bags with tuples that in turn have schemas.
+ * Retrieve the tuple schema of the bag if twoLevelAccessRequired
+ * Assuming that only bags exhibit this behavior and twoLevelAccessRequired is used
+ * with the right intentions
+ */
+ if(schema.isTwoLevelAccessRequired() || other.isTwoLevelAccessRequired()) {
+ if(schema.isTwoLevelAccessRequired()) {
+ schema = schema.getField(0).schema;
+ }
+
+ if(other.isTwoLevelAccessRequired()) {
+ other = other.getField(0).schema;
+ }
+
+ return LogicalSchema.equals(schema, other, relaxInner, relaxAlias);
+ }
+
+ if (schema.size() != other.size()) return false;
+
+ Iterator<LogicalFieldSchema> i = schema.fields.iterator();
+ Iterator<LogicalFieldSchema> j = other.fields.iterator();
+
+ while (i.hasNext()) {
+ LogicalFieldSchema myFs = i.next() ;
+ LogicalFieldSchema otherFs = j.next() ;
+
+ if (!relaxAlias) {
+ if( myFs.alias == null && otherFs.alias == null ) {
+ // good
+ } else if( myFs.alias == null ) {
+ return false ;
+ } else if( !myFs.alias.equals(otherFs.alias) ) {
+ return false ;
+ }
+ }
+
+ if (myFs.type != otherFs.type) {
+ return false ;
+ }
+
+ if (!relaxInner && !LogicalFieldSchema.equals( myFs, otherFs, false, relaxAlias ) ) {
+ // Compare recursively using field schema
+ return false ;
+ }
+ }
+ return true;
+ }
}
Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class SortInfoSetter extends LogicalRelationalNodesVisitor {
+
+ public SortInfoSetter(OperatorPlan plan) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOStore store) throws FrontendException {
+
+ Operator storePred = store.getPlan().getPredecessors(store).get(0);
+ if(storePred == null){
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Store." ;
+ throw new FrontendException(msg, errCode, PigException.BUG);
+ }
+
+ SortInfo sortInfo = null;
+ if(storePred instanceof LOLimit) {
+ storePred = store.getPlan().getPredecessors(storePred).get(0);
+ } else if (storePred instanceof LOSplitOutput) {
+ LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+ // We assume this is the LOSplitOutput we injected for this case:
+ // b = order a by $0; store b into '1'; store b into '2';
+ // In this case, we should mark both '1' and '2' as sorted
+ LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
+ if (conditionPlan.getSinks().size()==1) {
+ Operator root = conditionPlan.getSinks().get(0);
+ if (root instanceof ConstantExpression) {
+ Object value = ((ConstantExpression)root).getValue();
+ if (value instanceof Boolean && (Boolean)value==true) {
+ Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+ if (split instanceof LOSplit)
+ storePred = store.getPlan().getPredecessors(split).get(0);
+ }
+ }
+ }
+ }
+ // if this predecessor is a sort, get
+ // the sort info.
+ if(storePred instanceof LOSort) {
+ try {
+ sortInfo = ((LOSort)storePred).getSortInfo();
+ } catch (FrontendException e) {
+ throw new FrontendException(e);
+ }
+ }
+ store.setSortInfo(sortInfo);
+ }
+}
Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * A visitor that modifies the logical plan (if necessary) for union-onschema
+ * functionality. It runs logical plan validator so that the correct schema
+ * of its inputs is available. It inserts foreach statements in its input
+ * if the input operator schema does not match the schema created by
+ * merging all input schemas.
+ *
+ * Migrated from the old UnionOnSchemaSetter class.
+ *
+ */
+public class UnionOnSchemaSetter extends LogicalRelationalNodesVisitor{
+
+ public UnionOnSchemaSetter(OperatorPlan plan)
+ throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOUnion union) throws FrontendException {
+ if( !union.isOnSchema() )
+ return;
+
+ LogicalSchema outputSchema = union.getSchema();
+ int fieldCount = outputSchema.size();
+ OperatorPlan plan = union.getPlan();
+ List<Operator> preds = new ArrayList<Operator>();
+ preds.addAll( plan.getPredecessors( union ) );
+
+ List<LogicalSchema> fieldSchemas = new ArrayList<LogicalSchema>( fieldCount );
+ for( LogicalFieldSchema fs : outputSchema.getFields() ) {
+ LogicalSchema ls = new LogicalSchema();
+ ls.addField( new LogicalFieldSchema( fs.alias, fs.schema, DataType.NULL ) );
+ fieldSchemas.add( ls );
+ }
+
+ for( Operator pred : preds ) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)pred;
+ LogicalSchema opSchema = op.getSchema();
+ if( opSchema.isEqual( outputSchema ) )
+ continue;
+
+ LOForEach foreach = new LOForEach( plan );
+ LogicalPlan innerPlan = new LogicalPlan();
+
+ LOGenerate gen = new LOGenerate( innerPlan );
+ boolean[] flattenFlags = new boolean[fieldCount];
+ List<LogicalExpressionPlan> exprPlans = new ArrayList<LogicalExpressionPlan>( fieldCount );
+ List<Operator> genInputs = new ArrayList<Operator>();
+
+ // Get exprPlans, and genInputs
+ for( LogicalFieldSchema fs : outputSchema.getFields() ) {
+ LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+ exprPlans.add( exprPlan );
+ int pos = opSchema.getFieldPosition( fs.alias );
+ if( pos == -1 ) {
+ new ConstantExpression( exprPlan, null, fs );
+ } else {
+ ProjectExpression projExpr =
+ new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
+ if( opSchema.getField( pos ).type != fs.type ) {
+ new CastExpression( exprPlan, projExpr, fs );
+ }
+ genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
+ }
+ }
+
+ gen.setFlattenFlags( flattenFlags );
+ gen.setOutputPlans( exprPlans );
+ gen.setOutputPlanSchemas( fieldSchemas );
+ innerPlan.add( gen );
+ for( Operator input : genInputs ) {
+ innerPlan.add(input);
+ innerPlan.connect( input, gen );
+ }
+
+ foreach.setInnerPlan( innerPlan );
+
+ Pair<Integer, Integer> pair = plan.disconnect( pred, union );
+ plan.add( foreach );
+ plan.connect( pred, pair.first, foreach, 0 );
+ plan.connect( foreach, 0, union, pair.second );
+ }
+
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java (added)
+++ pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import junit.framework.Assert;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter;
+import org.junit.Test;
+
+public class TestUnionOnSchemaSetter {
+ @Test
+ public void test1() throws FrontendException {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ "B = load 'y' as ( u:int, v:int, z:long); " +
+ "C = union onschema A, B; " +
+ "D = store C into 'output';";
+ LogicalPlan plan = generateLogicalPlan( query );
+ if( plan != null ) {
+ int nodeCount = plan.size();
+ UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+ visitor.visit();
+ System.out.println( "Plan after setter: " + plan.toString() );
+ Assert.assertEquals( nodeCount + 2, plan.size() );
+ }
+ }
+
+ @Test
+ public void test2() throws FrontendException {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ "B = load 'y' as ( u:int, v:int, w:bytearray); " +
+ "C = union onschema A, B; " +
+ "D = store C into 'output';";
+ LogicalPlan plan = generateLogicalPlan( query );
+ if( plan != null ) {
+ int nc = plan.size();
+ UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+ visitor.visit();
+ System.out.println( "Plan after setter: " + plan.toString() );
+ Assert.assertEquals( nc + 1, plan.size() );
+ }
+ }
+
+ @Test
+ public void test3() throws FrontendException {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ "B = load 'y' as ( u:int, v:long, w:bytearray); " +
+ "C = union onschema A, B; " +
+ "D = store C into 'output';";
+ LogicalPlan plan = generateLogicalPlan( query );
+ if( plan != null ) {
+ int nc = plan.size();
+ UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+ visitor.visit();
+ System.out.println( "Plan after setter: " + plan.toString() );
+ Assert.assertEquals( nc, plan.size() );
+ }
+ }
+
+ @Test
+ public void testNegative1() {
+ String query = "A = load 'x' as ( u:int, v:long, w:chararray); " +
+ "B = load 'y' as ( u:int, v:long, w:long); " +
+ "C = union onschema A, B; " +
+ "D = store C into 'output';";
+ LogicalPlan plan = generateLogicalPlan( query );
+ if( plan != null ) {
+ UnionOnSchemaSetter visitor;
+ try {
+ visitor = new UnionOnSchemaSetter( plan );
+ visitor.visit();
+ } catch (FrontendException e) {
+ return; // Expect an exception.
+ }
+ }
+ Assert.fail( "Test case shouldn't pass!" );
+ }
+
+ private LogicalPlan generateLogicalPlan(String query) {
+ try {
+ return ParserTestingUtils.generateLogicalPlan( query );
+ } catch(Exception ex) {
+ Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
+ }
+ return null;
+ }
+
+}