You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/28 02:48:15 UTC
svn commit: r1028153 - in /pig/trunk: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/schema/ test/org/apache/pig/test/
Author: thejas
Date: Thu Oct 28 00:48:15 2010
New Revision: 1028153
URL: http://svn.apache.org/viewvc?rev=1028153&view=rev
Log:
PIG-1694: union-onschema projects null schema at parsing stage for some queries
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 28 00:48:15 2010
@@ -213,6 +213,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1694: union-onschema projects null schema at parsing stage for some queries (thejas)
+
PIG-1685: Pig is unable to handle counters for glob paths ? (daijy)
PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once (daijy)
Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu Oct 28 00:48:15 2010
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.RequiredFields;
@@ -61,30 +62,42 @@ public class LOUnion extends RelationalO
log.debug("Number of predecessors in the graph: " + s.size());
try {
Iterator<LogicalOperator> iter = s.iterator();
- LogicalOperator op = iter.next();
- if (null == op) {
- int errCode = 1006;
- String msg = "Could not find operator in plan";
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
- }
- if (op.getSchema()!=null)
- mSchema = new Schema(op.getSchema());
- else
- mSchema = null;
- while(iter.hasNext()) {
- op = iter.next();
- if(null != mSchema) {
- mSchema = mSchema.merge(op.getSchema(), false);
- } else {
+ //create merged schema
+ if(isOnSchema){
+ // this function can be called in parser, before
+ // the foreach statements to project previous columns as
+ //per merged schema are setup. So can't rely just on
+ //regular union's schema merge logic
+ mSchema = createMergedSchemaOnAlias(iter);
+ }else{
+ //schema for regular union
+ LogicalOperator op = iter.next();
+ if (null == op) {
+ int errCode = 1006;
+ String msg = "Could not find operator in plan";
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
+ }
+ if (op.getSchema()!=null)
+ mSchema = new Schema(op.getSchema());
+ else
mSchema = null;
- break;
+ while(iter.hasNext()) {
+ op = iter.next();
+ if(null != mSchema) {
+ mSchema = mSchema.merge(op.getSchema(), false);
+ } else {
+ mSchema = null;
+ break;
+ }
}
}
+
+ // set fieldschema parents
if(null != mSchema) {
for(Schema.FieldSchema fs: mSchema.getFields()) {
iter = s.iterator();
while(iter.hasNext()) {
- op = iter.next();
+ LogicalOperator op = iter.next();
Schema opSchema = op.getSchema();
if(null != opSchema) {
for(Schema.FieldSchema opFs: opSchema.getFields()) {
@@ -106,6 +119,49 @@ public class LOUnion extends RelationalO
return mSchema;
}
+ /**
+ * create schema for union-onschema
+ * @param iter
+ * @return
+ * @throws FrontendException
+ */
+ private Schema createMergedSchemaOnAlias(Iterator<LogicalOperator> iter)
+ throws FrontendException {
+ ArrayList<Schema> schemas = new ArrayList<Schema>();
+ while(iter.hasNext()){
+ LogicalOperator lop = iter.next();
+ Schema sch;
+ sch = lop.getSchema();
+ if(sch == null)
+ {
+ String msg = "Schema of relation " + lop.getAlias()
+ + " is null."
+ + " UNION ONSCHEMA cannot be used with relations that"
+ + " have null schema.";
+ throw new FrontendException(msg, 1116, PigException.INPUT);
+ }
+ for(Schema.FieldSchema fs : sch.getFields()){
+ if(fs.alias == null){
+ String msg = "Schema of relation " + lop.getAlias()
+ + " has a null fieldschema for column(s). Schema :" +
+ sch;
+ throw new FrontendException(msg, 1116, PigException.INPUT);
+ }
+ }
+ schemas.add(sch);
+ }
+ //create the merged schema
+ Schema mergedSchema ;
+ try {
+ mergedSchema = Schema.mergeSchemasByAlias(schemas);
+ }catch(SchemaMergeException e) {
+ String msg = "Error merging schemas for union operator : "
+ + e.getMessage();
+ throw new FrontendException(msg, 1116, PigException.INPUT, e);
+ }
+ return mergedSchema;
+ }
+
@Override
public String name() {
return getAliasString() + "Union " + mKey.scope + "-" + mKey.id;
Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java Thu Oct 28 00:48:15 2010
@@ -70,40 +70,12 @@ public class UnionOnSchemaSetter extends
validator.validate(mPlan, collector);
List<LogicalOperator> preds = mPlan.getPredecessors(loUnion);
- //validate each input schema, and collect them in the ArrayList
- ArrayList<Schema> schemas = new ArrayList<Schema>(preds.size());
- for(LogicalOperator lop : preds){
- Schema sch;
- try {
- sch = lop.getSchema();
- } catch (FrontendException e) {
- throw new UnionOnSchemaSetException("Error getting schema from logical operator");
- }
- if(sch == null)
- {
- String msg = "Schema of relation " + lop.getAlias()
- + " is null."
- + " UNION ONSCHEMA cannot be used with relations that"
- + " have null schema.";
- throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
- }
- for(Schema.FieldSchema fs : sch.getFields()){
- if(fs.alias == null){
- String msg = "Schema of relation " + lop.getAlias()
- + " has a null fieldschema for column(s). Schema :" +
- sch;
- throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
- }
- }
- schemas.add(sch);
- }
-
//create the merged schema
Schema mergedSchema ;
try {
- mergedSchema = Schema.mergeSchemasByAlias(schemas);
- }catch(SchemaMergeException e) {
- String msg = "Error merging schemas for union operator : "
+ mergedSchema = loUnion.getSchema();
+ }catch(FrontendException e) {
+ String msg = "Error creating merged schemas for union-onschema operator : "
+ e.getMessage();
throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT, e);
}
Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Oct 28 00:48:15 2010
@@ -871,6 +871,8 @@ public class Schema implements Serializa
return fs;
}
+
+
/**
* Given a field number, find the associated FieldSchema.
*
@@ -1676,6 +1678,12 @@ public class Schema implements Serializa
checkNullAlias(fs1, schema1);
FieldSchema fs2 = getFieldSubNameMatchThrowSchemaMergeException(schema2,fs1.alias);
if(fs2 != null){
+ if(schema2colsAdded.contains(fs2)){
+ // alias corresponds to multiple fields in schema1,
+ // just do a lookup on
+ // schema1 , that will throw the appropriate error.
+ getFieldSubNameMatchThrowSchemaMergeException(schema1, fs2.alias);
+ }
schema2colsAdded.add(fs2);
}
FieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Thu Oct 28 00:48:15 2010
@@ -120,6 +120,9 @@ public class TestUnionOnSchema {
+ "u = union onschema l1, l2;"
;
Util.registerMultiLineQuery(pig, query);
+ Schema expectedSch = Util.getSchemaFromString("i: int, j: int");
+ Schema sch = pig.dumpSchema("u");
+ assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
@@ -136,6 +139,72 @@ public class TestUnionOnSchema {
/**
+ * Test UNION ONSCHEMA with operations after the union
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testUnionOnSchemaFilter() throws IOException, ParseException {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ String query =
+ " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);"
+ + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ + "u = union onschema l1, l2;"
+ + "fil = filter u by i == 5 and (x is null or x != 1);"
+ ;
+ Util.registerMultiLineQuery(pig, query);
+
+ Schema sch = pig.dumpSchema("fil");
+ Schema expectedSch = Util.getSchemaFromString("i: int, x: int, j: int");
+ assertEquals("Checking expected schema",sch, expectedSch);
+
+
+ Iterator<Tuple> it = pig.openIterator("fil");
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(5,null,3)",
+ "(5,3,null)"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+
+ /**
+ * Test UNION ONSCHEMA with operations after the union
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testUnionOnSchemaSuccOps() throws IOException, ParseException {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ String query =
+ " l1 = load '" + INP_FILE_2NUMS + "' as (i : int);"
+ + "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);"
+ + "u = union onschema l1, l2;"
+ + "o = order u by i desc;"
+ + "lim = limit o 2;"
+ + "fil = filter lim by i == 5 and y is null;"
+ ;
+ Util.registerMultiLineQuery(pig, query);
+
+ Schema sch = pig.dumpSchema("fil");
+ Schema expectedSch = Util.getSchemaFromString("i: int, x: int, y: int");
+ assertEquals("Checking expected schema",sch, expectedSch);
+
+
+ Iterator<Tuple> it = pig.openIterator("fil");
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(5,null,null)",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ /**
* Test UNION ONSCHEMA with cast from bytearray to another type
* @throws IOException
* @throws ParseException