You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/05/10 22:46:50 UTC
svn commit: r1101637 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
test/org/apache/pig/test/TestMRCompiler.java
Author: daijy
Date: Tue May 10 20:46:49 2011
New Revision: 1101637
URL: http://svn.apache.org/viewvc?rev=1101637&view=rev
Log:
PIG-2030: Merged join/cogroup does not automatically ship loader
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1101637&r1=1101636&r2=1101637&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 10 20:46:49 2011
@@ -214,6 +214,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-2030: Merged join/cogroup does not automatically ship loader (daijy)
+
PIG-2052: Ship guava.jar to backend (daijy)
PIG-2012: Comments at the begining of the file throws off line numbers in errors (rding)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1101637&r1=1101636&r2=1101637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue May 10 20:46:49 2011
@@ -1472,6 +1472,8 @@ public class MRCompiler extends PhyPlanV
poCoGrp.setIndexFileName(idxFileSpec.getFileName());
baseMROp.mapPlan.addAsLeaf(poCoGrp);
+ for (FuncSpec funcSpec : funcSpecs)
+ baseMROp.UDFs.add(funcSpec.toString());
MRPlan.add(indexerMROp);
MRPlan.connect(indexerMROp, baseMROp);
@@ -1548,6 +1550,7 @@ public class MRCompiler extends PhyPlanV
idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
indexerMROp.mapPlan.add(idxJobLoader);
+ indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
// Loader of mro will return a tuple of form -
// (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
@@ -1692,9 +1695,11 @@ public class MRCompiler extends PhyPlanV
POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
joinOp.setSignature(rightLoader.getSignature());
LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+ List<String> udfs = new ArrayList<String>();
if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+ udfs.add(rightLoader.getLFile().getFuncSpec().toString());
// we don't need the right MROper since
// the right loader is an IndexableLoadFunc which can handle the index
@@ -1777,6 +1782,7 @@ public class MRCompiler extends PhyPlanV
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
joinOp.setIndexFile(strFile.getFileName());
+ udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
}
// We are done with right side. Lets work on left now.
@@ -1807,6 +1813,7 @@ public class MRCompiler extends PhyPlanV
// no combination of small splits as there is currently no way to guarantee the sortness
// of the combined splits.
curMROp.noCombineSmallSplits();
+ curMROp.UDFs.addAll(udfs);
}
catch(PlanException e){
int errCode = 2034;
Modified: pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=1101637&r1=1101636&r2=1101637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Tue May 10 20:46:49 2011
@@ -20,19 +20,24 @@ package org.apache.pig.test;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.AVG;
import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.SUM;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -1063,7 +1068,54 @@ public class TestMRCompiler extends juni
assertEquals(goldenPlan, compiledPlan);
}
-
+ public static class TestCollectableLoadFunc extends PigStorage implements CollectableLoadFunc {
+ @Override
+ public void ensureAllKeyInstancesInSameSplit() throws IOException {
+ }
+ }
+ public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc {
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ }
+
+ @Override
+ public void seekNear(Tuple keys) throws IOException {
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ @Test
+ public void testUDFInMergedCoGroup() throws Exception {
+ String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" +
+ "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
+ "c = cogroup a by $0, b by $0 using 'merge';" +
+ "store c into 'output';";
+
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+ MapReduceOper mrOper = mrPlan.getRoots().get(0);
+
+ assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
+ mrOper = mrPlan.getSuccessors(mrOper).get(0);
+ assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
+ assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
+ }
+
+ @Test
+ public void testUDFInMergedJoin() throws Exception {
+ String query = "a = load 'input1';" +
+ "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
+ "c = join a by $0, b by $0 using 'merge';" +
+ "store c into 'output';";
+
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+ MapReduceOper mrOper = mrPlan.getRoots().get(0);
+
+ assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
+ }
}