You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/05/10 22:47:27 UTC

svn commit: r1101638 - in /pig/branches/branch-0.9: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java test/org/apache/pig/test/TestMRCompiler.java

Author: daijy
Date: Tue May 10 20:47:27 2011
New Revision: 1101638

URL: http://svn.apache.org/viewvc?rev=1101638&view=rev
Log:
PIG-2030: Merged join/cogroup does not automatically ship loader

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1101638&r1=1101637&r2=1101638&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue May 10 20:47:27 2011
@@ -178,6 +178,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-2030: Merged join/cogroup does not automatically ship loader (daijy)
+
 PIG-2052: Ship guava.jar to backend (daijy)
 
 PIG-2012: Comments at the begining of the file throws off line numbers in errors (rding)

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1101638&r1=1101637&r2=1101638&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue May 10 20:47:27 2011
@@ -1472,6 +1472,8 @@ public class MRCompiler extends PhyPlanV
             poCoGrp.setIndexFileName(idxFileSpec.getFileName());
             
             baseMROp.mapPlan.addAsLeaf(poCoGrp);
+            for (FuncSpec funcSpec : funcSpecs)
+                baseMROp.UDFs.add(funcSpec.toString());
             MRPlan.add(indexerMROp);
             MRPlan.connect(indexerMROp, baseMROp);
 
@@ -1548,6 +1550,7 @@ public class MRCompiler extends PhyPlanV
         idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
                 new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
         indexerMROp.mapPlan.add(idxJobLoader);
+        indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
         
         // Loader of mro will return a tuple of form - 
         // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
@@ -1692,9 +1695,11 @@ public class MRCompiler extends PhyPlanV
             POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
             joinOp.setSignature(rightLoader.getSignature());
             LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+            List<String> udfs = new ArrayList<String>();
             if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
                 joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
                 joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+                udfs.add(rightLoader.getLFile().getFuncSpec().toString());
                 
                 // we don't need the right MROper since
                 // the right loader is an IndexableLoadFunc which can handle the index
@@ -1777,6 +1782,7 @@ public class MRCompiler extends PhyPlanV
                 joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());  
                 
                 joinOp.setIndexFile(strFile.getFileName());
+                udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
             }
             
             // We are done with right side. Lets work on left now.
@@ -1807,6 +1813,7 @@ public class MRCompiler extends PhyPlanV
             // no combination of small splits as there is currently no way to guarantee the sortness
             // of the combined splits.
             curMROp.noCombineSmallSplits();
+            curMROp.UDFs.addAll(udfs);
         }
         catch(PlanException e){
             int errCode = 2034;

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java?rev=1101638&r1=1101637&r2=1101638&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java Tue May 10 20:47:27 2011
@@ -20,19 +20,24 @@ package org.apache.pig.test;
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.AVG;
 import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.SUM;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -1063,7 +1068,54 @@ public class TestMRCompiler extends juni
         assertEquals(goldenPlan, compiledPlan);
     }
 
-
+    public static class TestCollectableLoadFunc extends PigStorage implements CollectableLoadFunc {
+        @Override
+        public void ensureAllKeyInstancesInSameSplit() throws IOException {
+        }
+    }
     
+    public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc {
+        @Override
+        public void initialize(Configuration conf) throws IOException {
+        }
+
+        @Override
+        public void seekNear(Tuple keys) throws IOException {
+        }
 
+        @Override
+        public void close() throws IOException {
+        }
+    }
+    
+    @Test
+    public void testUDFInMergedCoGroup() throws Exception {
+        String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" +
+            "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
+            "c = cogroup a by $0, b by $0 using 'merge';" +
+            "store c into 'output';";
+        
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        
+        assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
+        mrOper = mrPlan.getSuccessors(mrOper).get(0);
+        assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
+        assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
+    }
+    
+    @Test
+    public void testUDFInMergedJoin() throws Exception {
+        String query = "a = load 'input1';" + 
+            "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
+            "c = join a by $0, b by $0 using 'merge';" +
+            "store c into 'output';";
+        
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        
+        assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
+    }
 }