You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2017/12/27 15:55:23 UTC
svn commit: r1819344 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/relational/LOUnion.java
test/org/apache/pig/test/TestUnionOnSchema.java
Author: knoguchi
Date: Wed Dec 27 15:55:23 2017
New Revision: 1819344
URL: http://svn.apache.org/viewvc?rev=1819344&view=rev
Log:
PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Dec 27 15:55:23 2017
@@ -59,6 +59,8 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist
OPTIMIZATIONS
BUG FIXES
+PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
+
PIG-5300: hashCode for Bag needs to be order independent (knoguchi)
PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Wed Dec 27 15:55:23 2017
@@ -108,11 +108,51 @@ public class LOUnion extends LogicalRela
}
// Bring back cached uid if any; otherwise, cache uid generated
- for (int i=0;i<mergedSchema.size();i++)
- {
+ setMergedSchemaUids(mergedSchema, inputSchemas);
+
+ return schema = mergedSchema;
+ }
+
+ /**
+ * create schema for union-onschema
+ */
+ private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,
+ List<String> inputAliases)
+ throws FrontendException {
+ ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+ for (int i = 0; i < inputSchemas.size(); i++){
+ LogicalSchema sch = inputSchemas.get(i);
+ for( LogicalFieldSchema fs : sch.getFields() ) {
+ if(fs.alias == null){
+ String msg = "Schema of relation " + inputAliases.get(i)
+ + " has a null fieldschema for column(s). Schema :" + sch.toString(false);
+ throw new FrontendException( this, msg, 1116, PigException.INPUT );
+ }
+ }
+ schemas.add( sch );
+ }
+
+ //create the merged schema
+ LogicalSchema mergedSchema = null;
+ try {
+ mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
+ } catch(FrontendException e) {
+ String msg = "Error merging schemas for union operator : "
+ + e.getMessage();
+ throw new FrontendException(this, msg, 1116, PigException.INPUT, e);
+ }
+
+ return mergedSchema;
+ }
+
+ private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+ throws FrontendException {
+
+ for (int i=0;i<mergedSchema.size();i++) {
LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
long uid = -1;
+ List<LogicalSchema> fieldInputSchemas = new ArrayList<>(inputSchemas.size());
// Search all the cached uid mappings by input field to see if
// we've cached an output uid for this output field
@@ -125,8 +165,14 @@ public class LOUnion extends LogicalRela
}
if (inputFieldSchema != null) {
- uid = getCachedOuputUid(inputFieldSchema.uid);
- if (uid >= 0) break;
+ if (inputFieldSchema.schema != null) {
+ fieldInputSchemas.add(inputFieldSchema.schema);
+ }
+
+ if (uid < 0) {
+ uid = getCachedOuputUid(inputFieldSchema.uid);
+ if (uid >= 0 && outputFieldSchema.schema == null) break;
+ }
}
}
@@ -136,8 +182,8 @@ public class LOUnion extends LogicalRela
for (LogicalSchema inputSchema : inputSchemas) {
long inputUid;
LogicalFieldSchema matchedInputFieldSchema;
- if (onSchema) {
- matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
+ if (onSchema) {
+ matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
if (matchedInputFieldSchema!=null) {
inputUid = matchedInputFieldSchema.uid;
uidMapping.add(new Pair<Long, Long>(uid, inputUid));
@@ -145,50 +191,21 @@ public class LOUnion extends LogicalRela
}
else {
matchedInputFieldSchema = mergedSchema.getField(i);
- inputUid = inputSchema.getField(i).uid;
- uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ inputUid = inputSchema.getField(i).uid;
+ uidMapping.add(new Pair<Long, Long>(uid, inputUid));
}
}
}
outputFieldSchema.uid = uid;
- }
-
- return schema = mergedSchema;
- }
- /**
- * create schema for union-onschema
- */
- private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,
- List<String> inputAliases)
- throws FrontendException {
- ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
- for (int i = 0; i < inputSchemas.size(); i++){
- LogicalSchema sch = inputSchemas.get(i);
- for( LogicalFieldSchema fs : sch.getFields() ) {
- if(fs.alias == null){
- String msg = "Schema of relation " + inputAliases.get(i)
- + " has a null fieldschema for column(s). Schema :" + sch.toString(false);
- throw new FrontendException( this, msg, 1116, PigException.INPUT );
- }
+ // This field has a schema. Assign uids to it as well
+ if (outputFieldSchema.schema != null) {
+ setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
}
- schemas.add( sch );
}
-
- //create the merged schema
- LogicalSchema mergedSchema = null;
- try {
- mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
- } catch(FrontendException e) {
- String msg = "Error merging schemas for union operator : "
- + e.getMessage();
- throw new FrontendException(this, msg, 1116, PigException.INPUT, e);
- }
-
- return mergedSchema;
}
-
+
private long getCachedOuputUid(long inputUid) {
long uid = -1;
Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Wed Dec 27 15:55:23 2017
@@ -478,6 +478,43 @@ public class TestUnionOnSchema {
* Test UNION ONSCHEMA on 3 inputs
*/
@Test
+ public void testUnionOnSchemaInnerSchema() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
+ String query =
+ " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : long, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : long, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ + "u = union onschema l1, l2; "
+ // The addition in the inner foreach will fail if the inner schema's uids
+ // are all set to -1, since the code that finds the inner load's schema will
+ // match the last item in b's schema, which is a chararray
+ + "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE i, c, x; }";
+
+ Util.registerMultiLineQuery(pig, query);
+ pig.explain("p", System.out);
+
+ Iterator<Tuple> it = pig.openIterator("p");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1L,'abc',{(6),(6)})",
+ "(5L,'def',{(7),(7)})",
+ "(1L,'abc',{(6),(6)})",
+ "(5L,'def',{(7),(7)})"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+ }
+
+ /**
+ * Test UNION ONSCHEMA on 3 inputs
+ * @throws IOException
+ * @throws ParserException
+ */
+ @Test
public void testUnionOnSchema3Inputs() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =