You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/12/18 03:39:41 UTC

svn commit: r1423259 - in /pig/branches/branch-0.11: ./ src/org/apache/pig/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/newplan/logical/optimizer/ test/org/apache/pig/test/

Author: jcoveney
Date: Tue Dec 18 02:39:38 2012
New Revision: 1423259

URL: http://svn.apache.org/viewvc?rev=1423259&view=rev
Log:
PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)

Added:
    pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/
    pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/Main.java
    pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Dec 18 02:39:38 2012
@@ -332,6 +332,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
+
 PIG-3044: hotfix to remove divide by 0 error (jcoveney)
 
 PIG-3033: test-patch failed with javadoc warnings (fang fang chen via cheolsoo)

Modified: pig/branches/branch-0.11/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/Main.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/Main.java Tue Dec 18 02:39:38 2012
@@ -114,13 +114,16 @@ public class Main {
        Attributes attr=null;
        try {
             String findContainingJar = JarManager.findContainingJar(Main.class);
-            JarFile jar = new JarFile(findContainingJar);
-            final Manifest manifest = jar.getManifest();
-            final Map<String,Attributes> attrs = manifest.getEntries();
-            attr = attrs.get("org/apache/pig");
+            if (findContainingJar != null) {
+                JarFile jar = new JarFile(findContainingJar);
+                final Manifest manifest = jar.getManifest();
+                final Map<String,Attributes> attrs = manifest.getEntries();
+                attr = attrs.get("org/apache/pig");
+            } else {
+                log.info("Unable to read pigs manifest file as we are not running from a jar, version information unavailable");
+            }
         } catch (Exception e) {
-            log.warn("Unable to read pigs manifest file, version information unavailable");
-            log.warn("Exception: "+e);
+            log.warn("Unable to read pigs manifest file, version information unavailable", e);
         }
         if (attr!=null) {
             version = attr.getValue("Implementation-Version");
@@ -491,8 +494,8 @@ static int run(String args[], PigProgres
                 if (i != 0) sb.append(' ');
                 sb.append(remainders[i]);
             }
-            
-            sb.append('\n'); 
+
+            sb.append('\n');
 
             scriptState.setScript(sb.toString());
 

Modified: pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java Tue Dec 18 02:39:38 2012
@@ -21,12 +21,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+import com.google.common.collect.Sets;
 
 
 public class LOJoin extends LogicalRelationalOperator {
@@ -43,7 +48,7 @@ public class LOJoin extends LogicalRelat
         MERGESPARSE   // Sort Merge Index Join
     };
 
-    
+
     /**
      * LOJoin contains a list of logical operators corresponding to the
      * relational operators and a list of generates for each relational
@@ -51,21 +56,21 @@ public class LOJoin extends LogicalRelat
      * for the columns that are projected
      */
     //private static Log log = LogFactory.getLog(LOJoin.class);
-    // expression plans for each input. 
+    // expression plans for each input.
     private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
     // indicator for each input whether it is inner
     private boolean[] mInnerFlags;
     private JOINTYPE mJoinType; // Retains the type of the join
-    
-    /** 
+
+    /**
      * static constant to refer to the option of selecting a join type
      */
     public final static Integer OPTION_JOIN = 1;
-    
+
     public LOJoin(LogicalPlan plan) {
-        super("LOJoin", plan);     
+        super("LOJoin", plan);
     }
-    
+
     public LOJoin(LogicalPlan plan,
                 MultiMap<Integer, LogicalExpressionPlan> joinPlans,
                 JOINTYPE jt,
@@ -75,15 +80,15 @@ public class LOJoin extends LogicalRelat
         mJoinType = jt;
         mInnerFlags = isInner;
     }
-    
+
     public void setJoinPlans(MultiMap<Integer, LogicalExpressionPlan> joinPlans) {
         this.mJoinPlans = joinPlans;
     }
-    
+
     public void setInnerFlags(boolean[] isInner) {
         this.mInnerFlags = isInner;
     }
-    
+
     public void setJoinType(JOINTYPE jt) {
         this.mJoinType = jt;
     }
@@ -91,23 +96,23 @@ public class LOJoin extends LogicalRelat
     public boolean isInner(int inputIndex) {
         return mInnerFlags[inputIndex];
     }
-    
+
     public boolean[] getInnerFlags() {
         return mInnerFlags;
     }
-    
+
     public JOINTYPE getJoinType() {
         return mJoinType;
     }
-    
+
     public void resetJoinType() {
         mJoinType = JOINTYPE.HASH;
     }
-    
+
     public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
         return mJoinPlans.get(inputIndex);
     }
-    
+
     /**
      * Get all of the expressions plans that are in this join.
      * @return collection of all expression plans.
@@ -115,54 +120,81 @@ public class LOJoin extends LogicalRelat
     public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
         return mJoinPlans;
     }
-    
+
     public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
         return mJoinPlans.values();
     }
-    
+
     @Override
     public LogicalSchema getSchema() throws FrontendException {
         // if schema is calculated before, just return
         if (schema != null) {
             return schema;
         }
-        
+
         List<Operator> inputs = null;
         inputs = plan.getPredecessors(this);
         if (inputs == null) {
             return null;
         }
-        
+
         List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
-        
+
         for (Operator op : inputs) {
             LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
-            // the schema of one input is unknown, so the join schema is unknown, just return 
+            // the schema of one input is unknown, so the join schema is unknown, just return
             if (inputSchema == null) {
                 schema = null;
                 return schema;
             }
-                               
+
             for (int i=0; i<inputSchema.size(); i++) {
                  LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
                  LogicalSchema.LogicalFieldSchema newFS = null;
-                 if(fs.alias != null) {                    
-                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);                    
+                 if(fs.alias != null) {
+                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
                  } else {
                      newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
-                 }                       
-                 fss.add(newFS);                 
-            }            
-        }        
+                 }
+                 fss.add(newFS);
+            }
+        }
+
+        fixDuplicateUids(fss);
 
         schema = new LogicalSchema();
         for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
             schema.addField(fieldSchema);
-        }         
-        
+        }
+
         return schema;
     }
-    
+
+    /**
+     * In the case of a join it is possible for multiple columns to have been derived from the same
+     * column and thus have duplicate UID's. This detects that case and resets the uid.
+     * See PIG-3022 and PIG-3093 for more information.
+     * @param fss a list of LogicalFieldSchemas to check the uids of
+     */
+    private void fixDuplicateUids(List<LogicalFieldSchema> fss) {
+        Set<Long> uids = Sets.newHashSet();
+        for (LogicalFieldSchema lfs : fss) {
+            addFieldSchemaUidsToSet(uids, lfs);
+        }
+    }
+
+    private void addFieldSchemaUidsToSet(Set<Long> uids, LogicalFieldSchema lfs) {
+        while (!uids.add(lfs.uid)) {
+            lfs.uid = LogicalExpression.getNextUid();
+        }
+        LogicalSchema ls = lfs.schema;
+        if (ls != null) {
+            for (LogicalFieldSchema lfs2 : ls.getFields()) {
+                addFieldSchemaUidsToSet(uids, lfs2);
+            }
+        }
+    }
+
     @Override
     public void accept(PlanVisitor v) throws FrontendException {
         if (!(v instanceof LogicalRelationalNodesVisitor)) {
@@ -171,7 +203,7 @@ public class LOJoin extends LogicalRelat
         ((LogicalRelationalNodesVisitor)v).visit(this);
 
     }
-    
+
     @Override
     public boolean isEqual(Operator other) throws FrontendException {
         if (other != null && other instanceof LOJoin) {
@@ -182,12 +214,12 @@ public class LOJoin extends LogicalRelat
                 if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
             }
             if (!checkEquality(oj)) return false;
-            
+
             if (mJoinPlans.size() != oj.mJoinPlans.size())  return false;
-            
+
             // Now, we need to make sure that for each input we are projecting
             // the same columns.  This is slightly complicated since MultiMap
-            // doesn't return any particular order, so we have to find the 
+            // doesn't return any particular order, so we have to find the
             // matching input in each case.
             for (Integer p : mJoinPlans.keySet()) {
                 Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
@@ -200,7 +232,7 @@ public class LOJoin extends LogicalRelat
                     Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
                     Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
                     if (c.size() != oc.size()) return false;
-                    
+
                     if (!(c instanceof List) || !(oc instanceof List)) {
                         throw new FrontendException(
                             "Expected list of expression plans", 2238);
@@ -219,12 +251,12 @@ public class LOJoin extends LogicalRelat
             return false;
         }
     }
-    
+
     @Override
     public String getName() {
         return name + "(" + mJoinType.toString() + ")";
     }
-    
+
     public List<Operator> getInputs(LogicalPlan plan) {
         return plan.getPredecessors(this);
     }

Added: pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java?rev=1423259&view=auto
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java (added)
+++ pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java Tue Dec 18 02:39:38 2012
@@ -0,0 +1,82 @@
+package org.apache.pig.newplan.logical.optimizer;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * See: https://issues.apache.org/jira/browse/PIG-3020
+ *
+ */
+public class TestSchemaResetter {
+
+    @Test
+    public void testSchemaResetter() throws IOException {
+        new File("build/test/tmp/").mkdirs();
+        Util.createLocalInputFile("build/test/tmp/TestSchemaResetter.pig", new String[] {
+                "A = LOAD 'foo' AS (group:tuple(uid, dst_id));",
+                "edges_both = FOREACH A GENERATE",
+                "    group.uid AS src_id,",
+                "    group.dst_id AS dst_id;",
+                "both_counts = GROUP edges_both BY src_id;",
+                "both_counts = FOREACH both_counts GENERATE",
+                "    group AS src_id, SIZE(edges_both) AS size_both;",
+                "",
+                "edges_bq = FOREACH A GENERATE",
+                "    group.uid AS src_id,",
+                "    group.dst_id AS dst_id;",
+                "bq_counts = GROUP edges_bq BY src_id;",
+                "bq_counts = FOREACH bq_counts GENERATE",
+                "    group AS src_id, SIZE(edges_bq) AS size_bq;",
+                "",
+                "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;",
+                "store per_user_set_sizes into  'foo';"
+                });
+        assertEquals(0, PigRunner.run(new String[] {"-x", "local", "-c", "build/test/tmp/TestSchemaResetter.pig" } , null).getReturnCode());
+    }
+
+    @Test
+    public void testSchemaResetterExec() throws IOException {
+        PigServer pigServer = new PigServer(LOCAL);
+        Data data = Storage.resetData(pigServer);
+        data.set("input",
+                tuple(tuple("1", "2")),
+                tuple(tuple("2", "3")),
+                tuple(tuple("2", "4")));
+        pigServer.registerQuery(
+                "A = LOAD 'input' USING mock.Storage() AS (group:tuple(uid, dst_id));" +
+                "edges_both = FOREACH A GENERATE" +
+                "    group.uid AS src_id," +
+                "    group.dst_id AS dst_id;" +
+                "both_counts = GROUP edges_both BY src_id;" +
+                "both_counts = FOREACH both_counts GENERATE" +
+                "    group AS src_id, SIZE(edges_both) AS size_both;" +
+                "edges_bq = FOREACH A GENERATE" +
+                "    group.uid AS src_id," +
+                "    group.dst_id AS dst_id;" +
+                "bq_counts = GROUP edges_bq BY src_id;" +
+                "bq_counts = FOREACH bq_counts GENERATE" +
+                "    group AS src_id, SIZE(edges_bq) AS size_bq;" +
+                "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;" +
+                "store per_user_set_sizes into 'output' USING mock.Storage();");
+        List<Tuple> list = data.get("output");
+        Collections.sort(list);
+        assertEquals("list: "+list, 2, list.size());
+        assertEquals("(1,1,1,1)", list.get(0).toString());
+        assertEquals("(2,2,2,2)", list.get(1).toString());
+    }
+}

Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java Tue Dec 18 02:39:38 2012
@@ -18,16 +18,21 @@
 
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -35,6 +40,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -48,31 +54,33 @@ import org.junit.runners.JUnit4;
 
 import junit.framework.TestCase;
 
+import com.google.common.collect.Sets;
+
 /**
  * Test cases to test join statement
  */
 
 @RunWith(JUnit4.class)
 public class TestJoin extends TestCase {
-    
+
     static MiniCluster cluster;
     private PigServer pigServer;
 
     TupleFactory mTf = TupleFactory.getInstance();
     BagFactory mBf = BagFactory.getInstance();
     ExecType[] execTypes = new ExecType[] {ExecType.LOCAL, ExecType.MAPREDUCE};
-    
+
     @Before
     @Override
     public void setUp() throws Exception{
         FileLocalizer.setR(new Random());
     }
-    
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     private void setUp(ExecType execType) throws ExecException {
         // cause a reinitialization of FileLocalizer's
         // internal state
@@ -81,10 +89,10 @@ public class TestJoin extends TestCase {
             cluster =  MiniCluster.buildCluster();
             pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         } else if(execType == ExecType.LOCAL) {
-            pigServer = new PigServer(ExecType.LOCAL);       
+            pigServer = new PigServer(ExecType.LOCAL);
         }
     }
-    
+
     private String createInputFile(ExecType execType, String fileNameHint, String[] data) throws IOException {
         String fileName = "";
         if(execType == ExecType.MAPREDUCE) {
@@ -96,7 +104,7 @@ public class TestJoin extends TestCase {
         }
         return fileName;
     }
-    
+
     private void deleteInputFile(ExecType execType, String fileName) throws IOException {
         if(execType == ExecType.MAPREDUCE) {
             Util.deleteFile(cluster, fileName);
@@ -126,7 +134,7 @@ public class TestJoin extends TestCase {
                 "",
                 ""
                 };
-        
+
         String firstInput = createInputFile(ExecType.MAPREDUCE, "a.txt", input1);
         String secondInput = createInputFile(ExecType.MAPREDUCE, "b.txt", input2);
         String script = "a = load 'a.txt'  using PigStorage(' ');" +
@@ -138,7 +146,7 @@ public class TestJoin extends TestCase {
         deleteInputFile(ExecType.MAPREDUCE, firstInput);
         deleteInputFile(ExecType.MAPREDUCE, secondInput);
     }
-    
+
     @Test
     public void testJoinUnkownSchema() throws Exception {
         // If any of the input schema is unknown, the resulting schema should be unknown as well
@@ -167,11 +175,11 @@ public class TestJoin extends TestCase {
                     "good\tmorning",
                     "\tevening"
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
             Tuple expectedResult = (Tuple)Util.getPigConstant("('hello',1,'hello','world')");
-            
+
             // with schema
             String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
             		"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); " +
@@ -181,7 +189,7 @@ public class TestJoin extends TestCase {
             assertEquals(true, it.hasNext());
             assertEquals(expectedResult, it.next());
             assertEquals(false, it.hasNext());
-            
+
             // without schema
             script = "a = load '"+ Util.encodeEscape(firstInput) + "'; " +
             "b = load '" + Util.encodeEscape(secondInput) + "'; " +
@@ -195,8 +203,8 @@ public class TestJoin extends TestCase {
             deleteInputFile(execType, secondInput);
         }
     }
-    
-    
+
+
     @Test
     public void testJoinSchema() throws Exception {
         for (ExecType execType : execTypes) {
@@ -210,11 +218,11 @@ public class TestJoin extends TestCase {
                     "1\thello",
                     "4\tbye",
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
             Tuple expectedResult = (Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
-            
+
             // with schema
             String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
                     "b = load '"+ Util.encodeEscape(secondInput) +"' as (k:int, l:chararray); " +
@@ -225,7 +233,7 @@ public class TestJoin extends TestCase {
             assertEquals(true, it.hasNext());
             assertEquals(expectedResult, it.next());
             assertEquals(false, it.hasNext());
-            
+
             // schema with duplicates
             script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
             "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
@@ -241,7 +249,7 @@ public class TestJoin extends TestCase {
                 exceptionThrown = true;
             }
             assertEquals(true, exceptionThrown);
-            
+
             // schema with duplicates with resolution
             script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
             "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
@@ -256,7 +264,7 @@ public class TestJoin extends TestCase {
             deleteInputFile(execType, secondInput);
         }
     }
-    
+
     @Test
     public void testJoinSchema2() throws Exception {
         // test join where one load does not have schema
@@ -271,20 +279,20 @@ public class TestJoin extends TestCase {
                 "1\thello",
                 "4\tbye",
         };
-        
+
         String firstInput = createInputFile(execType, "a.txt", input1);
         String secondInput = createInputFile(execType, "b.txt", input2);
         Tuple expectedResultCharArray =
             (Tuple)Util.getPigConstant("('1','2','1','hello','1','2','1','hello')");
-        
+
         Tuple expectedResult = TupleFactory.getInstance().newTuple();
         for(Object field : expectedResultCharArray.getAll()){
             expectedResult.append(new DataByteArray(field.toString()));
         }
-        
+
         // with schema
         String script = "a = load '"+ Util.encodeEscape(firstInput) +"' ; " +
-        //re-using alias a for new operator below, doing this intentionally 
+        //re-using alias a for new operator below, doing this intentionally
         // because such use case has been seen
         "a = foreach a generate $0 as i, $1 as j ;" +
         "b = load '"+ Util.encodeEscape(secondInput) +"' as (k, l); " +
@@ -298,9 +306,9 @@ public class TestJoin extends TestCase {
         assertEquals(false, it.hasNext());
         deleteInputFile(execType, firstInput);
         deleteInputFile(execType, secondInput);
-        
+
     }
-    
+
     @Test
     public void testLeftOuterJoin() throws Exception {
         for (ExecType execType : execTypes) {
@@ -314,18 +322,18 @@ public class TestJoin extends TestCase {
                     "hello\tworld",
                     "good\tmorning",
                     "\tevening"
-    
+
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
             List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
+                    new String[] {
                             "('hello',1,'hello','world')",
                             "('bye',2,null,null)",
                             "(null,3,null,null)"
                     });
-            
+
             // with and without optional outer
             for(int i = 0; i < 2; i++) {
                 //with schema
@@ -339,7 +347,7 @@ public class TestJoin extends TestCase {
                 script += "d = order c by $1;";
                 // ensure we parse correctly
                 Util.buildLp(pigServer, script);
-                
+
                 // run query and test results only once
                 if(i == 0) {
                     Util.registerMultiLineQuery(pigServer, script);
@@ -349,7 +357,7 @@ public class TestJoin extends TestCase {
                         assertEquals(expectedResults.get(counter++), it.next());
                     }
                     assertEquals(expectedResults.size(), counter);
-                    
+
                     // without schema
                     script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
                     "b = load '"+ Util.encodeEscape(secondInput) +"'; ";
@@ -384,9 +392,9 @@ public class TestJoin extends TestCase {
                     "hello\tworld",
                     "good\tmorning",
                     "\tevening"
-    
+
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
             List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
@@ -408,7 +416,7 @@ public class TestJoin extends TestCase {
                 script += "d = order c by $3;";
                 // ensure we parse correctly
                 Util.buildLp(pigServer, script);
-                
+
                 // run query and test results only once
                 if(i == 0) {
                     Util.registerMultiLineQuery(pigServer, script);
@@ -418,7 +426,7 @@ public class TestJoin extends TestCase {
                         assertEquals(expectedResults.get(counter++), it.next());
                     }
                     assertEquals(expectedResults.size(), counter);
-                    
+
                     // without schema
                     script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
                     "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
@@ -439,7 +447,7 @@ public class TestJoin extends TestCase {
             deleteInputFile(execType, secondInput);
         }
     }
-    
+
     @Test
     public void testFullOuterJoin() throws Exception {
         for (ExecType execType : execTypes) {
@@ -453,9 +461,9 @@ public class TestJoin extends TestCase {
                     "hello\tworld",
                     "good\tmorning",
                     "\tevening"
-    
+
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
             List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
@@ -479,7 +487,7 @@ public class TestJoin extends TestCase {
                 script += "d = order c by $1, $3;";
                 // ensure we parse correctly
                 Util.buildLp(pigServer, script);
-                
+
                 // run query and test results only once
                 if(i == 0) {
                     Util.registerMultiLineQuery(pigServer, script);
@@ -489,7 +497,7 @@ public class TestJoin extends TestCase {
                         assertEquals(expectedResults.get(counter++), it.next());
                     }
                     assertEquals(expectedResults.size(), counter);
-                    
+
                     // without schema
                     script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
                     "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
@@ -510,7 +518,7 @@ public class TestJoin extends TestCase {
             deleteInputFile(execType, secondInput);
         }
     }
-    
+
     @Test
     public void testMultiOuterJoinFailure() throws ExecException {
     	setUp(ExecType.LOCAL);
@@ -521,7 +529,7 @@ public class TestJoin extends TestCase {
         for (int i = 0; i < types.length; i++) {
             boolean errCaught = false;
             try {
-            	String q = query + 
+            	String q = query +
             	           "d = join a by $0 " + types[i] + " outer, b by $0, c by $0;" +
             	           "store d into 'output';";
             	Util.buildLp(pigServer, q);
@@ -530,11 +538,11 @@ public class TestJoin extends TestCase {
                 assertTrue(e.getMessage().contains("mismatched input ',' expecting SEMI_COLON"));
             }
             assertEquals(true, errCaught);
-            
+
         }
-        
+
     }
-    
+
     @Test
     public void testNonRegularOuterJoinFailure() throws ExecException {
     	setUp(ExecType.LOCAL);
@@ -546,21 +554,21 @@ public class TestJoin extends TestCase {
             for(int j = 0; j < joinTypes.length; j++) {
                 boolean errCaught = false;
                 try {
-                	String q = query + "d = join a by $0 " + 
+                	String q = query + "d = join a by $0 " +
                     types[i] + " outer, b by $0 using '" + joinTypes[j] +"';" +
                     "store d into 'output';";
                     Util.buildLp(pigServer, q);
-                    
+
                 } catch(Exception e) {
                     errCaught = true;
                      // This after adding support of LeftOuter Join to replicated Join
-                        assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));   
+                        assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));
                 }
                 assertEquals( i == 0 ? false : true, errCaught);
             }
         }
     }
-    
+
     @Test
     public void testJoinTupleFieldKey() throws Exception{
         for (ExecType execType : execTypes) {
@@ -573,24 +581,24 @@ public class TestJoin extends TestCase {
                     "(1,b)",
                     "(2,bb)"
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
-            
+
             String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a:tuple(a1:int, a2:chararray));" +
                     "b = load '"+ Util.encodeEscape(secondInput) +"' as (b:tuple(b1:int, b2:chararray));" +
                     "c = join a by a.a1, b by b.b1;";
             Util.registerMultiLineQuery(pigServer, script);
             Iterator<Tuple> it = pigServer.openIterator("c");
-            
+
             assertTrue(it.hasNext());
             Tuple t = it.next();
             assertTrue(t.toString().equals("((1,a),(1,b))"));
-            
+
             assertTrue(it.hasNext());
             t = it.next();
             assertTrue(t.toString().equals("((2,aa),(2,bb))"));
-            
+
             deleteInputFile(execType, firstInput);
             deleteInputFile(execType, secondInput);
         }
@@ -608,27 +616,27 @@ public class TestJoin extends TestCase {
                     "1\t",
                     "2\taa"
             };
-            
+
             String firstInput = createInputFile(execType, "a.txt", input1);
             String secondInput = createInputFile(execType, "b.txt", input2);
-            
+
             String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a1:int, a2:chararray);" +
                     "b = load '"+ Util.encodeEscape(secondInput) +"' as (b1:int, b2:chararray);" +
                     "c = join a by (a1, a2), b by (b1, b2);";
             Util.registerMultiLineQuery(pigServer, script);
             Iterator<Tuple> it = pigServer.openIterator("c");
-            
+
             assertTrue(it.hasNext());
             Tuple t = it.next();
             assertTrue(t.toString().equals("(2,aa,2,aa)"));
-            
+
             assertFalse(it.hasNext());
-            
+
             deleteInputFile(execType, firstInput);
             deleteInputFile(execType, secondInput);
         }
     }
-    
+
     @Test
     public void testLiteralsForJoinAlgoSpecification1() throws Exception {
     	setUp(ExecType.LOCAL);
@@ -641,7 +649,7 @@ public class TestJoin extends TestCase {
         LOJoin join = (LOJoin)lp.getPredecessors( store ).get(0);
         assertEquals(JOINTYPE.MERGE, join.getJoinType());
     }
-    
+
     @Test
     public void testLiteralsForJoinAlgoSpecification2() throws Exception {
     	setUp(ExecType.LOCAL);
@@ -654,12 +662,12 @@ public class TestJoin extends TestCase {
         LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
         assertEquals(JOINTYPE.HASH, join.getJoinType());
     }
-    
+
     @Test
     public void testLiteralsForJoinAlgoSpecification5() throws Exception {
     	setUp(ExecType.LOCAL);
         String query = "a = load 'A'; " +
-                       "b = load 'B'; " + 
+                       "b = load 'B'; " +
                        "c = Join a by $0, b by $0 using 'default'; "+
                        "store c into 'output';";
         LogicalPlan lp = Util.buildLp(pigServer, query);
@@ -667,7 +675,7 @@ public class TestJoin extends TestCase {
         LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
        assertEquals(JOINTYPE.HASH, join.getJoinType());
     }
-    
+
     @Test
     public void testLiteralsForJoinAlgoSpecification3() throws Exception {
     	setUp(ExecType.LOCAL);
@@ -680,11 +688,11 @@ public class TestJoin extends TestCase {
         LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
         assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
     }
-    
+
     @Test
     public void testLiteralsForJoinAlgoSpecification4() throws Exception {
     	setUp(ExecType.LOCAL);
-        String query = "a = load 'A'; " + 
+        String query = "a = load 'A'; " +
                        "b = load 'B'; " +
                        "c = Join a by $0, b by $0 using 'replicated'; "+
                        "store c into 'output';";
@@ -693,4 +701,29 @@ public class TestJoin extends TestCase {
         LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
        assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
     }
+
+    // See: https://issues.apache.org/jira/browse/PIG-3093
+    @Test
+    public void testIndirectSelfJoinRealias() throws Exception {
+        setUp(ExecType.LOCAL);
+        Data data = resetData(pigServer);
+
+        Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"), tuple("c"));
+        data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
+        pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+        pigServer.registerQuery("B = foreach A generate *;");
+        pigServer.registerQuery("C = join A by field1, B by field1;");
+        assertEquals(Utils.getSchemaFromString("A::field1:chararray, B::field1:chararray"), pigServer.dumpSchema("C"));
+        pigServer.registerQuery("D = foreach C generate B::field1, A::field1 as field2;");
+        assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("D"));
+        pigServer.registerQuery("E = foreach D generate field1, field2;");
+        assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("E"));
+        pigServer.registerQuery("F = foreach E generate field2;");
+        Iterator<Tuple> it = pigServer.openIterator("F");
+        assertTrue(it.hasNext());
+        while (it.hasNext()) {
+            assertTrue(tuples.remove(it.next()));
+        }
+        assertFalse(it.hasNext());
+    }
 }