You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/07/17 03:29:17 UTC

svn commit: r794937 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: daijy
Date: Fri Jul 17 01:29:17 2009
New Revision: 794937

URL: http://svn.apache.org/viewvc?rev=794937&view=rev
Log:
PIG-888: Pig do not pass udf to the backend in some situation

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=794937&r1=794936&r2=794937&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jul 17 01:29:17 2009
@@ -40,6 +40,8 @@
 
 BUG FIXES
 
+    PIG-888: Pig do not pass udf to the backend in some situation (daijy)
+
     PIG-728: All backend error messages must be logged to preserve the
     original error messages (sms)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=794937&r1=794936&r2=794937&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Jul 17 01:29:17 2009
@@ -626,6 +626,11 @@
         for(MapReduceOper rmro : remLst){
             if(rmro.requestedParallelism > mergedMap.requestedParallelism)
                 mergedMap.requestedParallelism = rmro.requestedParallelism;
+            for (String udf:rmro.UDFs)
+            {
+                if (!mergedMap.UDFs.contains(udf))
+                    mergedMap.UDFs.add(udf);
+            }
             MRPlan.remove(rmro);
         }
         return ret;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=794937&r1=794936&r2=794937&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Fri Jul 17 01:29:17 2009
@@ -124,6 +124,7 @@
         // working dir to /user/<userid>
         if(pigContext.getExecType() == ExecType.MAPREDUCE)
             store.setActiveContainer(store.asContainer("/user/" + job.getUser()));
+        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
         wrapped.init(store);
         
         job.set("map.target.ops", ObjectSerializer.serialize(targetOps));

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=794937&r1=794936&r2=794937&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Fri Jul 17 01:29:17 2009
@@ -901,6 +901,22 @@
     }
 
 
+    @Test
+    public void testUDFInJoin() throws Exception {
+        planTester.buildPlan("a = load 'input1' using BinStorage();");
+        planTester.buildPlan("b = load 'input2';");
+        planTester.buildPlan("c = join a by $0, b by $0;");
+        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        
+        assertTrue(mrOper.UDFs.size()==2);
+        assertTrue(mrOper.UDFs.get(0).equals("BinStorage"));
+        assertTrue(mrOper.UDFs.get(1).equals("org.apache.pig.builtin.PigStorage"));
+    }
+    
     public static class WeirdComparator extends ComparisonFunc {
 
         @Override