You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Brian Johnson (JIRA)" <ji...@apache.org> on 2014/09/11 22:28:33 UTC

[jira] [Updated] (PIG-4166) Collected group drops last record when combined with merge join

     [ https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Brian Johnson updated PIG-4166:
-------------------------------
    Description: 
If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
{code}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
     @Override
     public Result getNextTuple() throws ExecException {
 
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput) {
-            if (outputBag != null) {
-                Tuple tup = mTupleFactory.newTuple(2);
-                tup.set(0, prevKey);
-                tup.set(1, outputBag);
-                outputBag = null;
-                return new Result(POStatus.STATUS_OK, tup);
-            }
-
-            return new Result(POStatus.STATUS_EOP, null);
-        }
+        
 
         Result inp = null;
         Result res = null;
 
         while (true) {
             inp = processInput();
+
             if (inp.returnStatus == POStatus.STATUS_EOP ||
                     inp.returnStatus == POStatus.STATUS_ERR) {
-                break;
+               // Since the output is buffered, we need to flush the last
+                // set of records when the close method is called by mapper.
+                if (this.parentPlan.endOfAllInput) {
+                    if (outputBag != null) {
+                        Tuple tup = mTupleFactory.newTuple(2);
+                        tup.set(0, prevKey);
+                        tup.set(1, outputBag);
+                        outputBag = null;
+                        return new Result(POStatus.STATUS_OK, tup);
+                    }
+
+                    return new Result(POStatus.STATUS_EOP, null);
+                } else
+                       break;
             }
 
             if (inp.returnStatus == POStatus.STATUS_NULL) {
{code}

  was:
If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.

diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
     @Override
     public Result getNextTuple() throws ExecException {
 
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput) {
-            if (outputBag != null) {
-                Tuple tup = mTupleFactory.newTuple(2);
-                tup.set(0, prevKey);
-                tup.set(1, outputBag);
-                outputBag = null;
-                return new Result(POStatus.STATUS_OK, tup);
-            }
-
-            return new Result(POStatus.STATUS_EOP, null);
-        }
+        
 
         Result inp = null;
         Result res = null;
 
         while (true) {
             inp = processInput();
+
             if (inp.returnStatus == POStatus.STATUS_EOP ||
                     inp.returnStatus == POStatus.STATUS_ERR) {
-                break;
+               // Since the output is buffered, we need to flush the last
+                // set of records when the close method is called by mapper.
+                if (this.parentPlan.endOfAllInput) {
+                    if (outputBag != null) {
+                        Tuple tup = mTupleFactory.newTuple(2);
+                        tup.set(0, prevKey);
+                        tup.set(1, outputBag);
+                        outputBag = null;
+                        return new Result(POStatus.STATUS_OK, tup);
+                    }
+
+                    return new Result(POStatus.STATUS_EOP, null);
+                } else
+                       break;
             }
 
             if (inp.returnStatus == POStatus.STATUS_NULL) {


> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
>                 Key: PIG-4166
>                 URL: https://issues.apache.org/jira/browse/PIG-4166
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Brian Johnson
>
> If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
> {code}
> diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
>      @Override
>      public Result getNextTuple() throws ExecException {
>  
> -        // Since the output is buffered, we need to flush the last
> -        // set of records when the close method is called by mapper.
> -        if (this.parentPlan.endOfAllInput) {
> -            if (outputBag != null) {
> -                Tuple tup = mTupleFactory.newTuple(2);
> -                tup.set(0, prevKey);
> -                tup.set(1, outputBag);
> -                outputBag = null;
> -                return new Result(POStatus.STATUS_OK, tup);
> -            }
> -
> -            return new Result(POStatus.STATUS_EOP, null);
> -        }
> +        
>  
>          Result inp = null;
>          Result res = null;
>  
>          while (true) {
>              inp = processInput();
> +
>              if (inp.returnStatus == POStatus.STATUS_EOP ||
>                      inp.returnStatus == POStatus.STATUS_ERR) {
> -                break;
> +               // Since the output is buffered, we need to flush the last
> +                // set of records when the close method is called by mapper.
> +                if (this.parentPlan.endOfAllInput) {
> +                    if (outputBag != null) {
> +                        Tuple tup = mTupleFactory.newTuple(2);
> +                        tup.set(0, prevKey);
> +                        tup.set(1, outputBag);
> +                        outputBag = null;
> +                        return new Result(POStatus.STATUS_OK, tup);
> +                    }
> +
> +                    return new Result(POStatus.STATUS_EOP, null);
> +                } else
> +                       break;
>              }
>  
>              if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)