You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/23 02:06:30 UTC

svn commit: r1304155 - in /pig/branches/branch-0.9: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ test/org/apache/pig/test/

Author: daijy
Date: Fri Mar 23 01:06:29 2012
New Revision: 1304155

URL: http://svn.apache.org/viewvc?rev=1304155&view=rev
Log:
PIG-2442: Multiple Stores in pig streaming causes infinite waiting

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestStreamingLocal.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1304155&r1=1304154&r2=1304155&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Fri Mar 23 01:06:29 2012
@@ -22,6 +22,8 @@ Release 0.9.3 - Unreleased
 
 BUG FIXES
 
+PIG-2442: Multiple Stores in pig streaming causes infinite waiting (daijy)
+
 PIG-2609: e2e harness: make hdfs base path configurable (outside default.conf) (thw via daijy)
 
 PIG-2590: running ant tar and rpm targets on same copy of pig source results in problems (thejas)

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1304155&r1=1304154&r2=1304155&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Fri Mar 23 01:06:29 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.List;
+
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
@@ -100,7 +102,15 @@ public class PhyPlanSetter extends PhyPl
     
     @Override
     public void visitSplit(POSplit spl) throws VisitorException{
-        super.visitSplit(spl);
+        PhysicalPlan oldPlan = parent;
+        List<PhysicalPlan> plans = spl.getPlans();
+        for (PhysicalPlan plan : plans) {
+            parent = plan;
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
+        parent=oldPlan;
         spl.setParentPlan(parent);
     }
     

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1304155&r1=1304154&r2=1304155&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Mar 23 01:06:29 2012
@@ -291,6 +291,9 @@ public class POSplit extends PhysicalOpe
             } 
             
             int idx = processedSet.nextClearBit(0);
+            if (inpEOP) {
+                myPlans.get(idx).endOfAllInput = true;
+            }
             PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
             
             res = leaf.getNext(dummyTuple);

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestStreamingLocal.java?rev=1304155&r1=1304154&r2=1304155&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestStreamingLocal.java Fri Mar 23 01:06:29 2012
@@ -20,6 +20,7 @@ package org.apache.pig.test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import junit.framework.TestCase;
 
@@ -289,36 +290,40 @@ public class TestStreamingLocal extends 
     }
     
     @Test
-    public void testJoinTwoStreamingRelations() 
+    // See PIG-2442
+    public void testTwoStreamingMultiStore()
     throws Exception {
-        ArrayList<String> list = new ArrayList<String>();
-        for (int i=0; i<10000; i++) {
-            list.add("A," + i);
-        }
-        File input = Util.createInputFile("tmp", "", list.toArray(new String[0]));
-        
-        // Expected results
-        Tuple expected = TupleFactory.getInstance().newTuple(4);
-        expected.set(0, "A");
-        expected.set(1, 0);
-        expected.set(2, "A");
-        expected.set(3, 0);        
-        
-        pigServer.registerQuery("A = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
-                    PigStorage.class.getName() + "(',') as (a0, a1);");
-        pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);");
-        pigServer.registerQuery("C = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
-                PigStorage.class.getName() + "(',') as (a0, a1);");
-        pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);");
-        pigServer.registerQuery("E = join B by a0, D by a0;");
-        
-        Iterator<Tuple> iter = pigServer.openIterator("E");
-        int count = 0;
-        while (iter.hasNext()) {
-            Assert.assertEquals(expected.toString(), iter.next().toString());
-            count++;
-        }
-        Assert.assertTrue(count == 1);
+        File input = File.createTempFile("tmp", "");
+        input.delete();
+        Util.createLocalInputFile(input.getAbsolutePath(), new String[] {"first", "second", "third"});
+
+        File output1 = File.createTempFile("tmp", "");
+        output1.delete();
+
+        File output2 = File.createTempFile("tmp", "");
+        output2.delete();
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = load '" + input.getAbsolutePath() + "';");
+        pigServer.registerQuery("B1 = stream A through `cat`;");
+        pigServer.registerQuery("B1 = foreach B1 generate $0;");
+        pigServer.registerQuery("STORE B1 INTO '" + output1.getAbsolutePath() + "' USING PigStorage();");
+        pigServer.registerQuery("B2 =  STREAM B1 THROUGH `cat`;");
+        pigServer.registerQuery("STORE B2 INTO '" + output2.getAbsolutePath() + "' USING PigStorage();");
+
+        pigServer.executeBatch();
+
+        List<Tuple> list = Util.readFile2TupleList(output1.getAbsolutePath() + File.separator +
+                "part-m-00000", "\t");
+        assertTrue(list.get(0).get(0).equals("first"));
+        assertTrue(list.get(1).get(0).equals("second"));
+        assertTrue(list.get(2).get(0).equals("third"));
+
+        list = Util.readFile2TupleList(output2.getAbsolutePath() + File.separator +
+                "part-m-00000", "\t");
+        assertTrue(list.get(0).get(0).equals("first"));
+        assertTrue(list.get(1).get(0).equals("second"));
+        assertTrue(list.get(2).get(0).equals("third"));
     }
 
     @Test