You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/23 02:04:36 UTC
svn commit: r1304153 - in /pig/branches/branch-0.10: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
test/org/apache/pig/test/
Author: daijy
Date: Fri Mar 23 01:04:36 2012
New Revision: 1304153
URL: http://svn.apache.org/viewvc?rev=1304153&view=rev
Log:
PIG-2442: Multiple Stores in pig streaming causes infinite waiting
Modified:
pig/branches/branch-0.10/CHANGES.txt
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
pig/branches/branch-0.10/test/org/apache/pig/test/TestStreamingLocal.java
Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1304153&r1=1304152&r2=1304153&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Fri Mar 23 01:04:36 2012
@@ -364,6 +364,8 @@ Release 0.9.3 - Unreleased
BUG FIXES
+PIG-2442: Multiple Stores in pig streaming causes infinite waiting (daijy)
+
PIG-2609: e2e harness: make hdfs base path configurable (outside default.conf) (thw via daijy)
PIG-2576: Change in behavior for UDFContext.getUDFContext().getJobConf() in front-end (thw via daijy)
Modified: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1304153&r1=1304152&r2=1304153&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Fri Mar 23 01:04:36 2012
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.List;
+
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
@@ -99,7 +101,15 @@ public class PhyPlanSetter extends PhyPl
@Override
public void visitSplit(POSplit spl) throws VisitorException{
- super.visitSplit(spl);
+ PhysicalPlan oldPlan = parent;
+ List<PhysicalPlan> plans = spl.getPlans();
+ for (PhysicalPlan plan : plans) {
+ parent = plan;
+ pushWalker(mCurrentWalker.spawnChildWalker(plan));
+ visit();
+ popWalker();
+ }
+ parent=oldPlan;
spl.setParentPlan(parent);
}
Modified: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1304153&r1=1304152&r2=1304153&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Mar 23 01:04:36 2012
@@ -291,6 +291,9 @@ public class POSplit extends PhysicalOpe
}
int idx = processedSet.nextClearBit(0);
+ if (inpEOP && parentPlan.endOfAllInput) {
+ myPlans.get(idx).endOfAllInput = true;
+ }
PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
res = leaf.getNext(dummyTuple);
Modified: pig/branches/branch-0.10/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestStreamingLocal.java?rev=1304153&r1=1304152&r2=1304153&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestStreamingLocal.java Fri Mar 23 01:04:36 2012
@@ -20,6 +20,7 @@ package org.apache.pig.test;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import junit.framework.TestCase;
@@ -289,36 +290,45 @@ public class TestStreamingLocal extends
}
@Test
- public void testJoinTwoStreamingRelations()
+ // See PIG-2442
+ public void testTwoStreamingMultiStore()
throws Exception {
- ArrayList<String> list = new ArrayList<String>();
- for (int i=0; i<10000; i++) {
- list.add("A," + i);
- }
- File input = Util.createInputFile("tmp", "", list.toArray(new String[0]));
+ File input = File.createTempFile("tmp", "");
+ input.delete();
+ Util.createLocalInputFile(input.getAbsolutePath(), new String[] {"first", "second", "third"});
- // Expected results
- Tuple expected = TupleFactory.getInstance().newTuple(4);
- expected.set(0, "A");
- expected.set(1, 0);
- expected.set(2, "A");
- expected.set(3, 0);
+ File output1 = File.createTempFile("tmp", "");
+ output1.delete();
- pigServer.registerQuery("A = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',') as (a0, a1);");
- pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);");
- pigServer.registerQuery("C = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',') as (a0, a1);");
- pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);");
- pigServer.registerQuery("E = join B by a0, D by a0;");
+ File output2 = File.createTempFile("tmp", "");
+ output2.delete();
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = load '" + input.getAbsolutePath() + "';");
+ pigServer.registerQuery("B1 = stream A through `cat`;");
+ pigServer.registerQuery("B1 = foreach B1 generate $0;");
+ pigServer.registerQuery("STORE B1 INTO '" + output1.getAbsolutePath() + "' USING PigStorage();");
+ pigServer.registerQuery("B2 = STREAM B1 THROUGH `cat`;");
+ pigServer.registerQuery("STORE B2 INTO '" + output2.getAbsolutePath() + "' USING PigStorage();");
+
+ pigServer.executeBatch();
+
+ List<Tuple> list = Util.readFile2TupleList(output1.getAbsolutePath() + File.separator +
+ "part-m-00000", "\t");
+ assertTrue(list.get(0).get(0).equals("first"));
+ assertTrue(list.get(1).get(0).equals("second"));
+ assertTrue(list.get(2).get(0).equals("third"));
+
+ list = Util.readFile2TupleList(output2.getAbsolutePath() + File.separator +
+ "part-m-00000", "\t");
+ assertTrue(list.get(0).get(0).equals("first"));
+ assertTrue(list.get(1).get(0).equals("second"));
+ assertTrue(list.get(2).get(0).equals("third"));
+ }
+
+ @Test
+ public void testJoinTwoStreamingRelations() {
- Iterator<Tuple> iter = pigServer.openIterator("E");
- int count = 0;
- while (iter.hasNext()) {
- Assert.assertEquals(expected.toString(), iter.next().toString());
- count++;
- }
- Assert.assertTrue(count == 1);
}
@Test