You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/02 21:04:45 UTC
svn commit: r832084 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
test/org/apache/pig/test/TestCombiner.java
Author: pradeepkth
Date: Mon Nov 2 20:04:45 2009
New Revision: 832084
URL: http://svn.apache.org/viewvc?rev=832084&view=rev
Log:
PIG-1030: explain and dump not working with two UDFs inside inner plan of foreach (rding via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 2 20:04:45 2009
@@ -109,6 +109,9 @@
BUG FIXES
+PIG-1030: explain and dump not working with two UDFs inside inner plan of
+foreach (rding via pradeepkth)
+
PIG-1048: inner join using 'skewed' produces multiple rows for keys with
single row in both input relations (sriranjan via gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Mon Nov 2 20:04:45 2009
@@ -712,6 +712,7 @@
// we apparently have seen a PODistinct before, so lets not
// combine.
sawNonAlgebraic = true;
+ return;
}
// check that this distinct is the only input to an agg
// We could have the following two cases
@@ -748,6 +749,16 @@
PhysicalOperator leaf = mPlan.getLeaves().get(0);
// the leaf has to be a POUserFunc (need not be algebraic)
if(leaf instanceof POUserFunc) {
+
+ // we want to combine only in the case where there is only
+ // one PODistinct which is the only input to an agg.
+ // Do not combine if there are additional inputs.
+ List<PhysicalOperator> preds = mPlan.getPredecessors(leaf);
+ if (preds.size() > 1) {
+ sawNonAlgebraic = true;
+ return;
+ }
+
List<PhysicalOperator> immediateSuccs = mPlan.getSuccessors(distinct);
if(immediateSuccs.size() == 1 && immediateSuccs.get(0) instanceof POProject) {
if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // script 1 above
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Mon Nov 2 20:04:45 2009
@@ -33,6 +33,7 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
@@ -341,4 +342,53 @@
}
}
+ public static class JiraPig1030 extends EvalFunc<String> {
+
+ public String exec(Tuple input) throws IOException {
+ return "";
+ }
+ }
+
+ @Test
+ public void testJiraPig1030() {
+ // test that combiner is NOT invoked when
+ // one of the elements in the foreach generate
+ // has a non-algebraic UDF that have multiple inputs
+ // (one of them is distinct).
+
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ try {
+ Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a all;");
+ pigServer.registerQuery("c = foreach b {" +
+ " d = distinct a.age;" +
+ " generate group, " + JiraPig1030.class.getName() + "(d, 0);};");
+
+ // make sure there isn't a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
}