You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/09 02:19:17 UTC
svn commit: r1006082 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/
Author: thejas
Date: Sat Oct 9 00:19:16 2010
New Revision: 1006082
URL: http://svn.apache.org/viewvc?rev=1006082&view=rev
Log:
PIG-1672: order of relations in replicated join gets switched in a query where
first relation has two mergeable foreach statements
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Oct 9 00:19:16 2010
@@ -209,6 +209,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1672: order of relations in replicated join gets switched in a query where
+ first relation has two mergeable foreach statements (thejas)
+
PIG-1666: union onschema fails when the input relation has cast from bytearray to another type (thejas)
PIG-1655: code duplicated for udfs that were moved from piggybank to builtin (nrai via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Sat Oct 9 00:19:16 2010
@@ -345,7 +345,7 @@ public class POFRJoin extends PhysicalOp
key.set(0, tuple.get(1));
Tuple value = getValueTuple(lr, tuple);
if (!replicate.containsKey(key))
- replicate.put(key, new ArrayList<Tuple>());
+ replicate.put(key, new ArrayList<Tuple>(1));
replicate.get(key).add(value);
}
replicates[i] = replicate;
@@ -390,8 +390,8 @@ public class POFRJoin extends PhysicalOp
// we have some fields of the "value" in the
// "key".
- retTup = mTupleFactory.newTuple();
int finalValueSize = keyLookupSize + val.size();
+ retTup = mTupleFactory.newTuple(finalValueSize);
int valIndex = 0; // an index for accessing elements from
// the value (val) that we have currently
for (int i = 0; i < finalValueSize; i++) {
@@ -400,16 +400,16 @@ public class POFRJoin extends PhysicalOp
// the field for this index is not in the
// key - so just take it from the "value"
// we were handed
- retTup.append(val.get(valIndex));
+ retTup.set(i, val.get(valIndex));
valIndex++;
} else {
// the field for this index is in the key
if (isKeyTuple) {
// the key is a tuple, extract the
// field out of the tuple
- retTup.append(keyAsTuple.get(keyIndex));
+ retTup.set(i, keyAsTuple.get(keyIndex));
} else {
- retTup.append(key);
+ retTup.set(i, key);
}
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Sat Oct 9 00:19:16 2010
@@ -261,8 +261,6 @@ public class MergeForEach extends Rule {
}
}
// remove foreach1, foreach2, add new foreach
- Operator pred = currentPlan.getPredecessors(foreach1).get(0);
- Operator succ = currentPlan.getSuccessors(foreach2).get(0);
// rebuild soft link
Collection<Operator> newSoftLinkPreds = Utils.mergeCollection(currentPlan.getSoftLinkPredecessors(foreach1),
@@ -290,16 +288,9 @@ public class MergeForEach extends Rule {
}
}
- Pair<Integer, Integer> pos = currentPlan.disconnect(pred, foreach1);
- currentPlan.disconnect(foreach1, foreach2);
- currentPlan.disconnect(foreach2, succ);
- currentPlan.remove(foreach1);
- currentPlan.remove(foreach2);
-
- currentPlan.add(newForEach);
- currentPlan.connect(pred, pos.first, newForEach, pos.second);
- currentPlan.connect(newForEach, succ);
-
+ currentPlan.removeAndReconnect(foreach1);
+ currentPlan.replace(foreach2, newForEach);
+
if (newSoftLinkPreds!=null) {
for (Operator softPred : newSoftLinkPreds) {
currentPlan.createSoftLink(softPred, newForEach);
Modified: pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1006082&r1=1006081&r2=1006082&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java Sat Oct 9 00:19:16 2010
@@ -18,6 +18,8 @@
package org.apache.pig.test;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.*;
@@ -26,7 +28,10 @@ import org.apache.pig.newplan.logical.Lo
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.rules.AddForEach;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeForEach;
@@ -210,6 +215,48 @@ public class TestMergeForEachOptimizatio
Assert.assertEquals( 2, forEachCount1 );
Assert.assertEquals( 2, forEachCount2 );
}
+
+
+ /**
+ * Ensure that join input order does not get reversed (PIG-1672)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testJoinInputOrder() throws IOException {
+ LogicalPlanTester lpt = new LogicalPlanTester( pc );
+ lpt.buildPlan( "l1 = load 'y' as (a);" );
+ lpt.buildPlan( "l2 = load 'z' as (a1,b1,c1,d1);" );
+ lpt.buildPlan( "f1 = foreach l2 generate a1 as a, b1 as b, c1 as c, d1 as d;" );
+ lpt.buildPlan( "f2 = foreach f1 generate a,b,c;" );
+ lpt.buildPlan( "j1 = join f2 by a, l1 by a using 'replicated';" );
+
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store j1 into 'empty';" );
+ LogicalPlan newLogicalPlan = migratePlan( plan );
+
+ int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+ int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+
+ int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+ Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
+ int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+ Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+ LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+ Assert.assertTrue( foreach2.getAlias().equals( "f2" ) );
+
+ LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class);
+ LogicalRelationalOperator leftInp =
+ (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0);
+ assertEquals("join child left", leftInp.getAlias(), "f2");
+
+ LogicalRelationalOperator rightInp =
+ (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1);
+ assertEquals("join child right", rightInp.getAlias(), "l1");
+
+ }
private int getForEachOperatorCount(LogicalPlan plan) {
Iterator<Operator> ops = plan.getOperators();
@@ -243,6 +290,25 @@ public class TestMergeForEachOptimizatio
}
return null;
}
+
+ /**
+ * returns first operator that is an instance of given class c
+ * @param plan
+ * @param c
+ * @return instance of class c if any, or null
+ * @throws IOException
+ */
+ private Operator getOperator(LogicalPlan plan, Class<? extends Operator> c) throws IOException {
+ Iterator<Operator> ops = plan.getOperators();
+ while( ops.hasNext() ) {
+ Operator op = ops.next();
+ if( op.getClass().equals(c)) {
+ return op;
+ }
+ }
+ return null;
+ }
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {