You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/09 02:19:17 UTC

svn commit: r1006082 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: thejas
Date: Sat Oct  9 00:19:16 2010
New Revision: 1006082

URL: http://svn.apache.org/viewvc?rev=1006082&view=rev
Log:
PIG-1672: order of relations in replicated join gets switched in a query where
 first relation has two mergeable foreach statements

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Oct  9 00:19:16 2010
@@ -209,6 +209,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1672: order of relations in replicated join gets switched in a query where
+ first relation has two mergeable foreach statements (thejas)
+
 PIG-1666: union onschema fails when the input relation has cast from bytearray to another type (thejas)
 
 PIG-1655: code duplicated for udfs that were moved from piggybank to builtin (nrai via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Sat Oct  9 00:19:16 2010
@@ -345,7 +345,7 @@ public class POFRJoin extends PhysicalOp
                 key.set(0, tuple.get(1));
                 Tuple value = getValueTuple(lr, tuple);
                 if (!replicate.containsKey(key))
-                    replicate.put(key, new ArrayList<Tuple>());
+                    replicate.put(key, new ArrayList<Tuple>(1));
                 replicate.get(key).add(value);
             }
             replicates[i] = replicate;
@@ -390,8 +390,8 @@ public class POFRJoin extends PhysicalOp
 
             // we have some fields of the "value" in the
             // "key".
-            retTup = mTupleFactory.newTuple();
             int finalValueSize = keyLookupSize + val.size();
+            retTup = mTupleFactory.newTuple(finalValueSize);
             int valIndex = 0; // an index for accessing elements from
             // the value (val) that we have currently
             for (int i = 0; i < finalValueSize; i++) {
@@ -400,16 +400,16 @@ public class POFRJoin extends PhysicalOp
                     // the field for this index is not in the
                     // key - so just take it from the "value"
                     // we were handed
-                    retTup.append(val.get(valIndex));
+                    retTup.set(i, val.get(valIndex));
                     valIndex++;
                 } else {
                     // the field for this index is in the key
                     if (isKeyTuple) {
                         // the key is a tuple, extract the
                         // field out of the tuple
-                        retTup.append(keyAsTuple.get(keyIndex));
+                        retTup.set(i, keyAsTuple.get(keyIndex));
                     } else {
-                        retTup.append(key);
+                        retTup.set(i, key);
                     }
                 }
             }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Sat Oct  9 00:19:16 2010
@@ -261,8 +261,6 @@ public class MergeForEach extends Rule {
                 }
             }
             // remove foreach1, foreach2, add new foreach
-            Operator pred = currentPlan.getPredecessors(foreach1).get(0);
-            Operator succ = currentPlan.getSuccessors(foreach2).get(0);
             
             // rebuild soft link
             Collection<Operator> newSoftLinkPreds = Utils.mergeCollection(currentPlan.getSoftLinkPredecessors(foreach1), 
@@ -290,16 +288,9 @@ public class MergeForEach extends Rule {
                 }
             }
             
-            Pair<Integer, Integer> pos = currentPlan.disconnect(pred, foreach1);
-            currentPlan.disconnect(foreach1, foreach2);
-            currentPlan.disconnect(foreach2, succ);
-            currentPlan.remove(foreach1);
-            currentPlan.remove(foreach2);
-
-            currentPlan.add(newForEach);
-            currentPlan.connect(pred, pos.first, newForEach, pos.second);
-            currentPlan.connect(newForEach, succ);
-            
+            currentPlan.removeAndReconnect(foreach1);
+            currentPlan.replace(foreach2, newForEach);
+                       
             if (newSoftLinkPreds!=null) {
                 for (Operator softPred : newSoftLinkPreds) {
                     currentPlan.createSoftLink(softPred, newForEach);

Modified: pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java Sat Oct  9 00:19:16 2010
@@ -18,6 +18,8 @@
 
 package org.apache.pig.test;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.*;
 
@@ -26,7 +28,10 @@ import org.apache.pig.newplan.logical.Lo
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeForEach;
@@ -210,6 +215,48 @@ public class TestMergeForEachOptimizatio
         Assert.assertEquals( 2, forEachCount1 );
         Assert.assertEquals( 2, forEachCount2 );
     }
+    
+    
+    /**
+     * Ensure that join input order does not get reversed (PIG-1672)
+     * 
+     * @throws IOException
+     */
+    @Test   
+    public void testJoinInputOrder() throws IOException  {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "l1 = load 'y' as (a);" );
+        lpt.buildPlan( "l2 = load 'z' as (a1,b1,c1,d1);" );
+        lpt.buildPlan( "f1 = foreach l2 generate a1 as a, b1 as b, c1 as c, d1 as d;" );
+        lpt.buildPlan( "f2 = foreach f1 generate a,b,c;" );
+        lpt.buildPlan( "j1 = join f2 by a, l1 by a using 'replicated';" );
+        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store j1 into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+        int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
+        int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+        Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+        LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+        Assert.assertTrue( foreach2.getAlias().equals( "f2" ) );
+        
+        LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class);
+        LogicalRelationalOperator leftInp =
+            (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0);
+        assertEquals("join child left", leftInp.getAlias(), "f2"); 
+        
+        LogicalRelationalOperator rightInp =
+            (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1);
+        assertEquals("join child right", rightInp.getAlias(), "l1"); 
+        
+    }
 
     private int getForEachOperatorCount(LogicalPlan plan) {
         Iterator<Operator> ops = plan.getOperators();
@@ -243,6 +290,25 @@ public class TestMergeForEachOptimizatio
         }
         return null;
     }
+    
+    /**
+     * returns first operator that is an instance of given class c
+     * @param plan
+     * @param c
+     * @return instance of class c if any, or null
+     * @throws IOException
+     */
+    private Operator getOperator(LogicalPlan plan, Class<? extends Operator> c) throws IOException {
+        Iterator<Operator> ops = plan.getOperators();
+        while( ops.hasNext() ) {
+            Operator op = ops.next();
+            if( op.getClass().equals(c)) {
+                return op;          
+            }
+        }
+        return null;
+    }
+    
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {