You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/12 21:05:00 UTC

svn commit: r1743565 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ test/org/apache/pig/data/

Author: rohini
Date: Thu May 12 21:05:00 2016
New Revision: 1743565

URL: http://svn.apache.org/viewvc?rev=1743565&view=rev
Log:
PIG-4874: Remove schema tuple reference overhead for replicate join hashmap (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
    pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1743565&r1=1743564&r2=1743565&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu May 12 21:05:00 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4874: Remove schema tuple reference overhead for replicate join hashmap (rohini)
+
 PIG-4879: Pull latest version of joda-time (rohini)
 
 PIG-4526: Make setting up the build environment easier (nielsbasjes via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1743565&r1=1743564&r2=1743565&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Thu May 12 21:05:00 2016
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
 
     // The array of Hashtables one per replicated input. replicates[fragment] =
     // null fragment is the input which is fragmented and not replicated.
-    protected transient TupleToMapKey replicates[];
+    protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
     // varaible which denotes whether we are returning tuples from the foreach
     // operator
     protected transient boolean processingPlan;
@@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
-            replicates = new TupleToMapKey[phyPlanLists.size()];
+            replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
+            for (int i = 0 ; i < phyPlanLists.size(); i++) {
+                replicates.add(null);
+            }
             dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
@@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Tuple key = mTupleFactory.newTuple(1);
-            key.set(0, lrOutTuple.get(1));
+            Object key = lrOutTuple.get(1);
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
             // Configure the for each operator with the relevant bags
@@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp
                     ce.setValue(value);
                     continue;
                 }
-                TupleToMapKey replicate = replicates[i];
+                Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
                 if (replicate.get(key) == null) {
                     if (isLeftOuterJoin) {
                         ce.setValue(nullBag);
@@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp
                     noMatch = true;
                     break;
                 }
-                ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
+                ce.setValue(new NonSpillableDataBag(replicate.get(key)));
             }
 
             // If this is not LeftOuter Join and there was no match we
@@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp
         }
     }
 
-    protected static class TupleToMapKey {
-        private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
+    protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
         private SchemaTupleFactory tf;
 
         public TupleToMapKey(int ct, SchemaTupleFactory tf) {
-            tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
+            super(ct);
             this.tf = tf;
         }
 
-        public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.put(key, val);
+            return (TuplesToSchemaTupleList) super.put(key, val);
         }
 
-        public TuplesToSchemaTupleList get(Tuple key) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList get(Object key) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.get(key);
+            return (TuplesToSchemaTupleList) super.get(key);
         }
     }
 
@@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates[i] = null;
+                replicates.set(i, null);
                 continue;
             }
 
@@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp
             POLocalRearrange lr = LRs[i];
             lr.setInputs(Arrays.asList((PhysicalOperator) ld));
 
-            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
+            } else {
+                replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            }
 
             log.debug("Completed setup. Trying to build replication hash table");
             for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
                 if (getReporter() != null)
                     getReporter().progress();
                 Tuple tuple = (Tuple) res.result;
-                if (isKeyNull(tuple.get(1))) continue;
-                Tuple key = mTupleFactory.newTuple(1);
-                key.set(0, tuple.get(1));
+                Object key = tuple.get(1);
+                if (isKeyNull(key)) continue;
                 Tuple value = getValueTuple(lr, tuple);
 
-                if (replicate.get(key) == null) {
-                    replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                ArrayList<Tuple> values = replicate.get(key);
+                if (values == null) {
+                    if (inputSchemaTupleFactory == null) {
+                        values = new ArrayList<Tuple>(1);
+                    } else {
+                        values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+                    }
+                    replicate.put(key, values);
                 }
-
-                replicate.get(key).add(value);
+                values.add(value);
             }
-            replicates[i] = replicate;
+            replicates.set(i, replicate);
         }
         long time2 = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (time2 - time1));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1743565&r1=1743564&r2=1743565&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Thu May 12 21:05:00 2016
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 
-/** This operator implements merge join algorithm to do map side joins. 
+/** This operator implements merge join algorithm to do map side joins.
  *  Currently, only two-way joins are supported. One input of join is identified as left
  *  and other is identified as right. Left input tuples are the input records in map.
  *  Right tuples are read from HDFS by opening right stream.
- *  
+ *
  *    This join doesn't support outer join.
  *    Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
  */
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
-    
+
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
@@ -231,12 +231,11 @@ public class POMergeJoin extends Physica
      * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
      * from the source, though in the future that is what we would like to do.
      */
-    public static class TuplesToSchemaTupleList {
-        private List<Tuple> tuples;
+    public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
         private SchemaTupleFactory tf;
 
         public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
-            tuples = new ArrayList<Tuple>(ct);
+            super(ct);
             if (tf instanceof SchemaTupleFactory) {
                 this.tf = (SchemaTupleFactory)tf;
             }
@@ -255,24 +254,24 @@ public class POMergeJoin extends Physica
             }
         }
 
+        @Override
         public boolean add(Tuple t) {
             if (tf != null) {
                 t = convert(t, tf);
             }
-            return tuples.add(t);
+            return super.add(t);
         }
 
+        @Override
         public Tuple get(int i) {
-            return tuples.get(i);
+            return super.get(i);
         }
 
+        @Override
         public int size() {
-            return tuples.size();
+            return super.size();
         }
 
-        public List<Tuple> getList() {
-            return tuples;
-        }
     }
 
     @SuppressWarnings("unchecked")
@@ -339,7 +338,7 @@ public class POMergeJoin extends Physica
                 }
                 else{
                     Object rightKey = extractKeysFromTuple(rightInp, 1);
-                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them 
+                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them
                         continue;       // and fetch next tuple.
 
                     int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -381,7 +380,7 @@ public class POMergeJoin extends Physica
                             "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
-                    }    
+                    }
                 }
             }
         }
@@ -412,14 +411,14 @@ public class POMergeJoin extends Physica
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
- 
+
         case POStatus.STATUS_EOP:
             if(this.parentPlan.endOfAllInput){
-                // We hit the end on left input. 
+                // We hit the end on left input.
                 // Tuples in bag may still possibly join with right side.
                 curJoinKey = prevLeftKey;
                 curLeftKey = null;
-                break;                
+                break;
             }
             else    // Fetch next left input.
                 return curLeftInp;
@@ -442,7 +441,7 @@ public class POMergeJoin extends Physica
         // Accumulated tuples with same key on left side.
         // But since we are reading ahead we still haven't checked the read ahead right tuple.
         // Accumulated left tuples may potentially join with that. So, lets check that first.
-        
+
         if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
 
             curJoiningRightTup = (Tuple)prevRightInp.result;
@@ -464,17 +463,17 @@ public class POMergeJoin extends Physica
                 slidingToNextRecord = false;
             } else
                 rightInp = getNextRightInp(prevLeftKey);
-                
+
             if(rightInp.returnStatus != POStatus.STATUS_OK)
                 return rightInp;
 
             Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-            
-            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them 
+
+            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
                 continue;       // and fetch next tuple.
-            
+
             Comparable rightKey = (Comparable)extractedRightKey;
-            
+
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
@@ -505,7 +504,7 @@ public class POMergeJoin extends Physica
             else{    // We got ahead on right side. Store currently read right tuple.
                 prevRightKey = rightKey;
                 prevRightInp = rightInp;
-                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
+                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
                 leftTuples = newLeftTupleArray();
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
@@ -532,7 +531,7 @@ public class POMergeJoin extends Physica
             DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
             loader.setIndexFile(indexFile);
         }
-        
+
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         rightLoader.setUDFContextSignature(signature);
@@ -585,11 +584,11 @@ public class POMergeJoin extends Physica
                         // run the tuple through the pipeline
                         rightPipelineRoot.attachInput(t);
                         return this.getNextRightInp();
-                        
+
                     }
                     default: // We don't deal with ERR/NULL. just pass them down
                         throwProcessingException(false, null);
-                        
+
                 }
             }
         } catch (IOException e) {
@@ -620,8 +619,8 @@ public class POMergeJoin extends Physica
             int errCode = 2167;
             String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
             throw new ExecException(errMsg,errCode,PigException.BUG);
-        } 
-          
+        }
+
         return ((Tuple) lrOut.result).get(1);
     }
 
@@ -637,7 +636,7 @@ public class POMergeJoin extends Physica
             noInnerPlanOnRightSide = false;
             this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
             this.rightPipelineRoot = rightPipeline.getRoots().get(0);
-            this.rightPipelineRoot.setInputs(null);            
+            this.rightPipelineRoot.setInputs(null);
         }
         else
             noInnerPlanOnRightSide = true;
@@ -688,18 +687,18 @@ public class POMergeJoin extends Physica
     public boolean supportsMultipleOutputs() {
         return false;
     }
-    
+
     /**
      * @param rightInputFileName the rightInputFileName to set
      */
     public void setRightInputFileName(String rightInputFileName) {
         this.rightInputFileName = rightInputFileName;
     }
-    
+
     public String getSignature() {
         return signature;
     }
-    
+
     public void setSignature(String signature) {
         this.signature = signature;
     }
@@ -711,12 +710,12 @@ public class POMergeJoin extends Physica
     public String getIndexFile() {
         return indexFile;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
-    
+
     public LOJoin.JOINTYPE getJoinType() {
         return joinType;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1743565&r1=1743564&r2=1743565&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Thu May 12 21:05:00 2016
@@ -19,6 +19,8 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -118,6 +120,7 @@ public class POFRJoinTez extends POFRJoi
      *
      * @throws ExecException
      */
+    @SuppressWarnings("unchecked")
     @Override
     protected void setUpHashMap() throws ExecException {
 
@@ -125,8 +128,8 @@ public class POFRJoinTez extends POFRJoi
         // where same POFRJoinTez occurs in different Split sub-plans
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
-            replicates = (TupleToMapKey[]) cacheValue;
-            log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+            replicates =  (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue;
+            log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
             return;
         }
 
@@ -152,7 +155,7 @@ public class POFRJoinTez extends POFRJoi
 
         long time1 = System.currentTimeMillis();
 
-        replicates[fragment] = null;
+        replicates.set(fragment, null);
         int inputIdx = 0;
         // We need to adjust the index because the number of replInputs is
         // one less than the number of inputSchemas. The inputSchemas
@@ -162,7 +165,12 @@ public class POFRJoinTez extends POFRJoi
             SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx];
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx];
 
-            TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
+            } else {
+                replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+            }
             POLocalRearrange lr = LRs[schemaIdx];
 
             try {
@@ -172,7 +180,8 @@ public class POFRJoinTez extends POFRJoi
                     }
 
                     PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey();
-                    if (isKeyNull(key.getValueAsPigType())) continue;
+                    Object keyValue = key.getValueAsPigType();
+                    if (isKeyNull(keyValue)) continue;
                     NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue();
 
                     // POFRJoin#getValueTuple() is reused to construct valTuple,
@@ -180,27 +189,31 @@ public class POFRJoinTez extends POFRJoi
                     // construct one here.
                     Tuple retTuple = mTupleFactory.newTuple(3);
                     retTuple.set(0, key.getIndex());
-                    retTuple.set(1, key.getValueAsPigType());
+                    retTuple.set(1, keyValue);
                     retTuple.set(2, val.getValueAsPigType());
                     Tuple valTuple = getValueTuple(lr, retTuple);
 
-                    Tuple keyTuple = mTupleFactory.newTuple(1);
-                    keyTuple.set(0, key.getValueAsPigType());
-                    if (replicate.get(keyTuple) == null) {
-                        replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                    ArrayList<Tuple> values = replicate.get(keyValue);
+                    if (values == null) {
+                        if (inputSchemaTupleFactory == null) {
+                            values = new ArrayList<Tuple>(1);
+                        } else {
+                            values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+                        }
+                        replicate.put(keyValue, values);
                     }
-                    replicate.get(keyTuple).add(valTuple);
+                    values.add(valTuple);
                 }
             } catch (IOException e) {
                 throw new ExecException(e);
             }
-            replicates[schemaIdx] = replicate;
+            replicates.set(schemaIdx, replicate);
             inputIdx++;
             schemaIdx++;
         }
 
         long time2 = System.currentTimeMillis();
-        log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+        log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
 
         ObjectCache.getInstance().cache(cacheKey, replicates);
         log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);

Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1743565&r1=1743564&r2=1743565&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Thu May 12 21:05:00 2016
@@ -17,9 +17,9 @@
  */
 package org.apache.pig.data;
 
-import static junit.framework.Assert.assertEquals;
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -599,33 +599,33 @@ public class TestSchemaTuple {
         Data data = resetData(pigServer);
 
         data.set("foo1",
-            tuple(0),
-            tuple(1),
-            tuple(2),
-            tuple(3),
-            tuple(4),
-            tuple(5),
-            tuple(6),
-            tuple(7),
-            tuple(8),
-            tuple(9)
+            tuple(0, 0),
+            tuple(1, 1),
+            tuple(2, 2),
+            tuple(3, 3),
+            tuple(4, 4),
+            tuple(5, 5),
+            tuple(6, 6),
+            tuple(7, 7),
+            tuple(8, 8),
+            tuple(9, 9)
             );
 
         data.set("foo2",
-            tuple(0),
-            tuple(1),
-            tuple(2),
-            tuple(3),
-            tuple(4),
-            tuple(5),
-            tuple(6),
-            tuple(7),
-            tuple(8),
-            tuple(9)
+            tuple(0, 0),
+            tuple(1, 1),
+            tuple(2, 2),
+            tuple(3, 3),
+            tuple(4, 4),
+            tuple(5, 5),
+            tuple(6, 6),
+            tuple(7, 7),
+            tuple(8, 8),
+            tuple(9, 9)
             );
 
-        pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);");
-        pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);");
+        pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int, y:int);");
+        pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int, y:int);");
         if (preSort) {
             pigServer.registerQuery("A = ORDER A BY x ASC;");
             pigServer.registerQuery("B = ORDER B BY x ASC;");
@@ -638,20 +638,24 @@ public class TestSchemaTuple {
             if (!out.hasNext()) {
                 throw new Exception("Output should have had more elements! Failed on element: " + i);
             }
-            assertEquals(tuple(i, i), out.next());
+            assertEquals(tuple(i, i, i, i), out.next());
         }
         assertFalse(out.hasNext());
 
-        pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();");
+        pigServer.registerQuery("STORE D INTO 'bar1' USING mock.Storage();");
+        pigServer.registerQuery("E = JOIN A by (x, y),  B by (x, y) using '"+joinType+"';");
+        pigServer.registerQuery("F = ORDER E BY $0 ASC;");
+        pigServer.registerQuery("STORE F INTO 'bar2' USING mock.Storage();");
 
-        List<Tuple> tuples = data.get("bar");
+        List<Tuple> bar1 = data.get("bar1");
+        List<Tuple> bar2 = data.get("bar2");
 
-        if (tuples.size() != 10) {
-            throw new Exception("Output does not have enough elements! List: " + tuples);
-        }
+        assertEquals("Output does not have enough elements! List: " + bar1, 10, bar1.size());
+        assertEquals("Output does not have enough elements! List: " + bar2, 10, bar2.size());
 
         for (int i = 0; i < 10; i++) {
-            assertEquals(tuple(i, i), tuples.get(i));
+            assertEquals(tuple(i, i, i, i), bar1.get(i));
+            assertEquals(tuple(i, i, i, i), bar2.get(i));
         }
 
     }