You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2018/11/30 21:11:38 UTC

svn commit: r1847856 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/relational/LOUnion.java test/org/apache/pig/test/TestNewPlanColumnPrune.java

Author: knoguchi
Date: Fri Nov 30 21:11:38 2018
New Revision: 1847856

URL: http://svn.apache.org/viewvc?rev=1847856&view=rev
Log:
PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1847856&r1=1847855&r2=1847856&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Nov 30 21:11:38 2018
@@ -88,6 +88,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
+
 PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini)
 
 PIG-5355: Negative progress report by HBaseTableRecordReader (satishsaley via knoguchi)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1847856&r1=1847855&r2=1847856&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Fri Nov 30 21:11:38 2018
@@ -34,9 +34,11 @@ import org.apache.pig.newplan.logical.re
 
 public class LOUnion extends LogicalRelationalOperator {
     private boolean onSchema;
+
+    private static String UID_SEPARATOR = "_";
     
     // uid mapping from output uid to input uid
-    private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>();
+    private List<Pair<Long, String>> uidMapping = new ArrayList<Pair<Long, String>>();
     
     public LOUnion(OperatorPlan plan) {
         super("LOUnion", plan);
@@ -108,7 +110,7 @@ public class LOUnion extends LogicalRela
         }
 
         // Bring back cached uid if any; otherwise, cache uid generated
-        setMergedSchemaUids(mergedSchema, inputSchemas);
+        setMergedSchemaUids(mergedSchema, inputSchemas, "");
 
         return schema = mergedSchema;
     }
@@ -145,7 +147,7 @@ public class LOUnion extends LogicalRela
         return mergedSchema;
     }
 
-    private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+    private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas, String nested_uids)
     throws FrontendException {
 
         for (int i=0;i<mergedSchema.size();i++) {
@@ -170,7 +172,7 @@ public class LOUnion extends LogicalRela
                     }
 
                     if (uid < 0) {
-                        uid = getCachedOuputUid(inputFieldSchema.uid);
+                        uid = getCachedOuputUid(createNestedUids(nested_uids,inputFieldSchema.uid));
                         if (uid >= 0 && outputFieldSchema.schema == null) break;
                     }
                 }
@@ -186,13 +188,13 @@ public class LOUnion extends LogicalRela
                         matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
                         if (matchedInputFieldSchema!=null) {
                             inputUid = matchedInputFieldSchema.uid;
-                            uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                            uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
                         }
                     }
                     else {
                         matchedInputFieldSchema = mergedSchema.getField(i);
                         inputUid = inputSchema.getField(i).uid;
-                        uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                        uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
                     }
                 }
             }
@@ -201,16 +203,28 @@ public class LOUnion extends LogicalRela
 
             // This field has a schema. Assign uids to it as well
             if (outputFieldSchema.schema != null) {
-                setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
+                setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas, createNestedUids(nested_uids,outputFieldSchema.uid));
             }
         }
     }
 
-    private long getCachedOuputUid(long inputUid) {
+    private String createNestedUids(String nested_uids, long new_uid) {
+        StringBuilder sb = new StringBuilder(nested_uids);
+        sb.append(UID_SEPARATOR);
+        sb.append(new_uid);
+        return sb.toString();
+    }
+
+    private long getLeafUid(String nested_uids) {
+        String [] uid_root_to_leaf = nested_uids.split(UID_SEPARATOR);
+        return Long.valueOf(uid_root_to_leaf[uid_root_to_leaf.length-1]);
+    }
+
+    private long getCachedOuputUid(String nested_input_uids) {
         long uid = -1;
         
-        for (Pair<Long, Long> pair : uidMapping) {
-            if (pair.second==inputUid) {
+        for (Pair<Long, String> pair : uidMapping) {
+            if (pair.second.equals(nested_input_uids)) {
                 uid = pair.first;
                 break;
             }
@@ -237,18 +251,18 @@ public class LOUnion extends LogicalRela
     }
 
     // Get input uids mapping to the output uid
-    public Set<Long> getInputUids(long uid) {
+    public Set<Long> getInputUids(long outputuid) {
         Set<Long> result = new HashSet<Long>();
-        for (Pair<Long, Long> pair : uidMapping) {
-            if (pair.first==uid)
-                result.add(pair.second);
+        for (Pair<Long, String> pair : uidMapping) {
+            if (pair.first==outputuid)
+                result.add(getLeafUid(pair.second));
         }
         return result;
     }
     
     @Override
     public void resetUid() {
-        uidMapping = new ArrayList<Pair<Long, Long>>();
+        uidMapping = new ArrayList<Pair<Long, String>>();
     }
     
     public List<Operator> getInputs() {

Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=1847856&r1=1847855&r2=1847856&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java Fri Nov 30 21:11:38 2018
@@ -482,6 +482,35 @@ public class TestNewPlanColumnPrune {
         }
     }
 
+    @Test
+    public void testUnionOnschemaWithInnerBag() throws Exception  {
+        // After handing inner-bag in Union-onschema,
+        // ColumnPrune broke due to overlapping uid inside the relation and
+        // ones inside the inner-bag  (PIG-5370)
+        String query = "A0 = load 'd.txt' as (a0:int, a1:int, a2:int, a3:int);" +
+        "A = FOREACH A0 GENERATE a0, a1, a2;" +
+        "B = FOREACH (GROUP A by (a0,a1)) {" +
+        "    A_FOREACH = FOREACH A GENERATE a1,a2;" +
+        "    GENERATE A, FLATTEN(A_FOREACH) as (a1,a2);" +
+        "}" +
+        "C = load 'd2.txt' as (A:bag{tuple:(a0:int, a1:int, a2:int)}, a1:int,a2:int);" +
+        "Z = UNION ONSCHEMA B, C;"  +
+        "store Z into 'empty';";
+
+        LogicalPlan newLogicalPlan = buildPlan(query);
+
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        System.err.println(newLogicalPlan);
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+            Operator o = iter.next();
+            LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
+            if (lro == null || lro.getAlias() == null) continue;
+            assertNotNull(lro.getSchema());
+        }
+    }
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
@@ -505,4 +534,3 @@ public class TestNewPlanColumnPrune {
         }
     }
 }
-