You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/12/18 01:19:22 UTC

svn commit: r1423231 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/newplan/logical/optimizer/ test/org/apache/pig/test/

Author: jcoveney
Date: Tue Dec 18 00:19:20 2012
New Revision: 1423231

URL: http://svn.apache.org/viewvc?rev=1423231&view=rev
Log:
PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)

Added:
    pig/trunk/test/org/apache/pig/newplan/logical/optimizer/
    pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    pig/trunk/test/org/apache/pig/test/TestJoin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Dec 18 00:19:20 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
+
 PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)
 
 PIG-2341: Need better documentation on Pig/HBase integration (jthakrar and billgraham via billgraham)

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Tue Dec 18 00:19:20 2012
@@ -114,13 +114,16 @@ public class Main {
        Attributes attr=null;
        try {
             String findContainingJar = JarManager.findContainingJar(Main.class);
-            JarFile jar = new JarFile(findContainingJar);
-            final Manifest manifest = jar.getManifest();
-            final Map<String,Attributes> attrs = manifest.getEntries();
-            attr = attrs.get("org/apache/pig");
+            if (findContainingJar != null) {
+                JarFile jar = new JarFile(findContainingJar);
+                final Manifest manifest = jar.getManifest();
+                final Map<String,Attributes> attrs = manifest.getEntries();
+                attr = attrs.get("org/apache/pig");
+            } else {
+                log.info("Unable to read pigs manifest file as we are not running from a jar, version information unavailable");
+            }
         } catch (Exception e) {
-            log.warn("Unable to read pigs manifest file, version information unavailable");
-            log.warn("Exception: "+e);
+            log.warn("Unable to read pigs manifest file, version information unavailable", e);
         }
         if (attr!=null) {
             version = attr.getValue("Implementation-Version");
@@ -491,8 +494,8 @@ static int run(String args[], PigProgres
                 if (i != 0) sb.append(' ');
                 sb.append(remainders[i]);
             }
-            
-            sb.append('\n'); 
+
+            sb.append('\n');
 
             scriptState.setScript(sb.toString());
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Tue Dec 18 00:19:20 2012
@@ -21,12 +21,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+import com.google.common.collect.Sets;
 
 
 public class LOJoin extends LogicalRelationalOperator {
@@ -43,7 +48,7 @@ public class LOJoin extends LogicalRelat
         MERGESPARSE   // Sort Merge Index Join
     };
 
-    
+
     /**
      * LOJoin contains a list of logical operators corresponding to the
      * relational operators and a list of generates for each relational
@@ -51,21 +56,21 @@ public class LOJoin extends LogicalRelat
      * for the columns that are projected
      */
     //private static Log log = LogFactory.getLog(LOJoin.class);
-    // expression plans for each input. 
+    // expression plans for each input.
     private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
     // indicator for each input whether it is inner
     private boolean[] mInnerFlags;
     private JOINTYPE mJoinType; // Retains the type of the join
-    
-    /** 
+
+    /**
      * static constant to refer to the option of selecting a join type
      */
     public final static Integer OPTION_JOIN = 1;
-    
+
     public LOJoin(LogicalPlan plan) {
-        super("LOJoin", plan);     
+        super("LOJoin", plan);
     }
-    
+
     public LOJoin(LogicalPlan plan,
                 MultiMap<Integer, LogicalExpressionPlan> joinPlans,
                 JOINTYPE jt,
@@ -75,15 +80,15 @@ public class LOJoin extends LogicalRelat
         mJoinType = jt;
         mInnerFlags = isInner;
     }
-    
+
     public void setJoinPlans(MultiMap<Integer, LogicalExpressionPlan> joinPlans) {
         this.mJoinPlans = joinPlans;
     }
-    
+
     public void setInnerFlags(boolean[] isInner) {
         this.mInnerFlags = isInner;
     }
-    
+
     public void setJoinType(JOINTYPE jt) {
         this.mJoinType = jt;
     }
@@ -91,23 +96,23 @@ public class LOJoin extends LogicalRelat
     public boolean isInner(int inputIndex) {
         return mInnerFlags[inputIndex];
     }
-    
+
     public boolean[] getInnerFlags() {
         return mInnerFlags;
     }
-    
+
     public JOINTYPE getJoinType() {
         return mJoinType;
     }
-    
+
     public void resetJoinType() {
         mJoinType = JOINTYPE.HASH;
     }
-    
+
     public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
         return mJoinPlans.get(inputIndex);
     }
-    
+
     /**
      * Get all of the expressions plans that are in this join.
      * @return collection of all expression plans.
@@ -115,54 +120,81 @@ public class LOJoin extends LogicalRelat
     public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
         return mJoinPlans;
     }
-    
+
     public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
         return mJoinPlans.values();
     }
-    
+
     @Override
     public LogicalSchema getSchema() throws FrontendException {
         // if schema is calculated before, just return
         if (schema != null) {
             return schema;
         }
-        
+
         List<Operator> inputs = null;
         inputs = plan.getPredecessors(this);
         if (inputs == null) {
             return null;
         }
-        
+
         List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
-        
+
         for (Operator op : inputs) {
             LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
-            // the schema of one input is unknown, so the join schema is unknown, just return 
+            // the schema of one input is unknown, so the join schema is unknown, just return
             if (inputSchema == null) {
                 schema = null;
                 return schema;
             }
-                               
+
             for (int i=0; i<inputSchema.size(); i++) {
                  LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
                  LogicalSchema.LogicalFieldSchema newFS = null;
-                 if(fs.alias != null) {                    
-                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);                    
+                 if(fs.alias != null) {
+                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
                  } else {
                      newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
-                 }                       
-                 fss.add(newFS);                 
-            }            
-        }        
+                 }
+                 fss.add(newFS);
+            }
+        }
+
+        fixDuplicateUids(fss);
 
         schema = new LogicalSchema();
         for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
             schema.addField(fieldSchema);
-        }         
-        
+        }
+
         return schema;
     }
-    
+
+    /**
+     * In the case of a join it is possible for multiple columns to have been derived from the same
+     * column and thus have duplicate UID's. This detects that case and resets the uid.
+     * See PIG-3022 and PIG-3093 for more information.
+     * @param fss a list of LogicalFieldSchemas to check the uids of
+     */
+    private void fixDuplicateUids(List<LogicalFieldSchema> fss) {
+        Set<Long> uids = Sets.newHashSet();
+        for (LogicalFieldSchema lfs : fss) {
+            addFieldSchemaUidsToSet(uids, lfs);
+        }
+    }
+
+    private void addFieldSchemaUidsToSet(Set<Long> uids, LogicalFieldSchema lfs) {
+        while (!uids.add(lfs.uid)) {
+            lfs.uid = LogicalExpression.getNextUid();
+        }
+        LogicalSchema ls = lfs.schema;
+        if (ls != null) {
+            for (LogicalFieldSchema lfs2 : ls.getFields()) {
+                addFieldSchemaUidsToSet(uids, lfs2);
+            }
+        }
+    }
+
     @Override
     public void accept(PlanVisitor v) throws FrontendException {
         if (!(v instanceof LogicalRelationalNodesVisitor)) {
@@ -171,7 +203,7 @@ public class LOJoin extends LogicalRelat
         ((LogicalRelationalNodesVisitor)v).visit(this);
 
     }
-    
+
     @Override
     public boolean isEqual(Operator other) throws FrontendException {
         if (other != null && other instanceof LOJoin) {
@@ -182,12 +214,12 @@ public class LOJoin extends LogicalRelat
                 if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
             }
             if (!checkEquality(oj)) return false;
-            
+
             if (mJoinPlans.size() != oj.mJoinPlans.size())  return false;
-            
+
             // Now, we need to make sure that for each input we are projecting
             // the same columns.  This is slightly complicated since MultiMap
-            // doesn't return any particular order, so we have to find the 
+            // doesn't return any particular order, so we have to find the
             // matching input in each case.
             for (Integer p : mJoinPlans.keySet()) {
                 Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
@@ -200,7 +232,7 @@ public class LOJoin extends LogicalRelat
                     Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
                     Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
                     if (c.size() != oc.size()) return false;
-                    
+
                     if (!(c instanceof List) || !(oc instanceof List)) {
                         throw new FrontendException(
                             "Expected list of expression plans", 2238);
@@ -219,12 +251,12 @@ public class LOJoin extends LogicalRelat
             return false;
         }
     }
-    
+
     @Override
     public String getName() {
         return name + "(" + mJoinType.toString() + ")";
     }
-    
+
     public List<Operator> getInputs(LogicalPlan plan) {
         return plan.getPredecessors(this);
     }

Added: pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java?rev=1423231&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java (added)
+++ pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java Tue Dec 18 00:19:20 2012
@@ -0,0 +1,82 @@
+package org.apache.pig.newplan.logical.optimizer;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * See: https://issues.apache.org/jira/browse/PIG-3020
+ *
+ */
+public class TestSchemaResetter {
+
+    @Test
+    public void testSchemaResetter() throws IOException {
+        new File("build/test/tmp/").mkdirs();
+        Util.createLocalInputFile("build/test/tmp/TestSchemaResetter.pig", new String[] {
+                "A = LOAD 'foo' AS (group:tuple(uid, dst_id));",
+                "edges_both = FOREACH A GENERATE",
+                "    group.uid AS src_id,",
+                "    group.dst_id AS dst_id;",
+                "both_counts = GROUP edges_both BY src_id;",
+                "both_counts = FOREACH both_counts GENERATE",
+                "    group AS src_id, SIZE(edges_both) AS size_both;",
+                "",
+                "edges_bq = FOREACH A GENERATE",
+                "    group.uid AS src_id,",
+                "    group.dst_id AS dst_id;",
+                "bq_counts = GROUP edges_bq BY src_id;",
+                "bq_counts = FOREACH bq_counts GENERATE",
+                "    group AS src_id, SIZE(edges_bq) AS size_bq;",
+                "",
+                "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;",
+                "store per_user_set_sizes into  'foo';"
+                });
+        assertEquals(0, PigRunner.run(new String[] {"-x", "local", "-c", "build/test/tmp/TestSchemaResetter.pig" } , null).getReturnCode());
+    }
+
+    @Test
+    public void testSchemaResetterExec() throws IOException {
+        PigServer pigServer = new PigServer(LOCAL);
+        Data data = Storage.resetData(pigServer);
+        data.set("input",
+                tuple(tuple("1", "2")),
+                tuple(tuple("2", "3")),
+                tuple(tuple("2", "4")));
+        pigServer.registerQuery(
+                "A = LOAD 'input' USING mock.Storage() AS (group:tuple(uid, dst_id));" +
+                "edges_both = FOREACH A GENERATE" +
+                "    group.uid AS src_id," +
+                "    group.dst_id AS dst_id;" +
+                "both_counts = GROUP edges_both BY src_id;" +
+                "both_counts = FOREACH both_counts GENERATE" +
+                "    group AS src_id, SIZE(edges_both) AS size_both;" +
+                "edges_bq = FOREACH A GENERATE" +
+                "    group.uid AS src_id," +
+                "    group.dst_id AS dst_id;" +
+                "bq_counts = GROUP edges_bq BY src_id;" +
+                "bq_counts = FOREACH bq_counts GENERATE" +
+                "    group AS src_id, SIZE(edges_bq) AS size_bq;" +
+                "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;" +
+                "store per_user_set_sizes into 'output' USING mock.Storage();");
+        List<Tuple> list = data.get("output");
+        Collections.sort(list);
+        assertEquals("list: "+list, 2, list.size());
+        assertEquals("(1,1,1,1)", list.get(0).toString());
+        assertEquals("(2,2,2,2)", list.get(1).toString());
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Dec 18 00:19:20 2012
@@ -18,6 +18,8 @@
 
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -28,11 +30,13 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -40,6 +44,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
@@ -49,6 +54,8 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 /**
  * Test cases to test join statement
  */
@@ -692,5 +699,29 @@ public class TestJoin {
         LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
         assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
     }
-}
 
+    // See: https://issues.apache.org/jira/browse/PIG-3093
+    @Test
+    public void testIndirectSelfJoinRealias() throws Exception {
+        setUp(ExecType.LOCAL);
+        Data data = resetData(pigServer);
+
+        Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"), tuple("c"));
+        data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
+        pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+        pigServer.registerQuery("B = foreach A generate *;");
+        pigServer.registerQuery("C = join A by field1, B by field1;");
+        assertEquals(Utils.getSchemaFromString("A::field1:chararray, B::field1:chararray"), pigServer.dumpSchema("C"));
+        pigServer.registerQuery("D = foreach C generate B::field1, A::field1 as field2;");
+        assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("D"));
+        pigServer.registerQuery("E = foreach D generate field1, field2;");
+        assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("E"));
+        pigServer.registerQuery("F = foreach E generate field2;");
+        Iterator<Tuple> it = pigServer.openIterator("F");
+        assertTrue(it.hasNext());
+        while (it.hasNext()) {
+            assertTrue(tuples.remove(it.next()));
+        }
+        assertFalse(it.hasNext());
+    }
+}
\ No newline at end of file