You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/30 19:07:24 UTC

svn commit: r1635572 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java test/org/apache/pig/test/TestCollectedGroup.java

Author: daijy
Date: Thu Oct 30 18:07:24 2014
New Revision: 1635572

URL: http://svn.apache.org/r1635572
Log:
PIG-4166: Collected group drops last record when combined with merge join

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 30 18:07:24 2014
@@ -105,6 +105,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4166: Collected group drops last record when combined with merge join (bridiver via daijy)
+
 PIG-2495: Using merge JOIN from a HBaseStorage produces an error (bridiver via daijy)
 
 PIG-4182: e2e tests Scripting_[1-12] fail on Windows (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Oct 30 18:07:24 2014
@@ -124,18 +124,14 @@ public class POCollectedGroup extends Ph
     @Override
     public Result getNextTuple() throws ExecException {
 
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput) {
-            return getStreamCloseResult();
-        }
-
         Result inp = null;
         Result res = null;
 
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP) {
+                // Since the output is buffered, we need to flush the last
+                // set of records when the close method is called by mapper.
                 if (this.parentPlan.endOfAllInput) {
                     return getStreamCloseResult();
                 } else {

Modified: pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Thu Oct 30 18:07:24 2014
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -292,6 +293,43 @@ public class TestCollectedGroup {
         }
     }
 
+    @Test
+    public void testMapsideGroupWithMergeJoin() throws IOException{
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+            	pigServer.registerQuery("C = join A by id, B by id using 'merge';");
+                pigServer.registerQuery("D = group C by A::id using 'collected';");
+                pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
+
+                while (iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+            	pigServer.registerQuery("F = join A by id, B by id;");
+                pigServer.registerQuery("G = group F by A::id;");
+                pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
+                Iterator<Tuple> iter = pigServer.openIterator("H");
+
+                while (iter.hasNext()) {
+                    dbshj.add(iter.next());
+                }
+            }
+            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
     public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{
 
         @Override