You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/30 19:07:24 UTC
svn commit: r1635572 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
test/org/apache/pig/test/TestCollectedGroup.java
Author: daijy
Date: Thu Oct 30 18:07:24 2014
New Revision: 1635572
URL: http://svn.apache.org/r1635572
Log:
PIG-4166: Collected group drops last record when combined with merge join
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 30 18:07:24 2014
@@ -105,6 +105,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4166: Collected group drops last record when combined with merge join (bridiver via daijy)
+
PIG-2495: Using merge JOIN from a HBaseStorage produces an error (bridiver via daijy)
PIG-4182: e2e tests Scripting_[1-12] fail on Windows (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Oct 30 18:07:24 2014
@@ -124,18 +124,14 @@ public class POCollectedGroup extends Ph
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- return getStreamCloseResult();
- }
-
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP) {
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
if (this.parentPlan.endOfAllInput) {
return getStreamCloseResult();
} else {
Modified: pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=1635572&r1=1635571&r2=1635572&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Thu Oct 30 18:07:24 2014
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -292,6 +293,43 @@ public class TestCollectedGroup {
}
}
+ @Test
+ public void testMapsideGroupWithMergeJoin() throws IOException{
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id, B by id using 'merge';");
+ pigServer.registerQuery("D = group C by A::id using 'collected';");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("F = join A by id, B by id;");
+ pigServer.registerQuery("G = group F by A::id;");
+ pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
+ Iterator<Tuple> iter = pigServer.openIterator("H");
+
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{
@Override