You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Brian Johnson (JIRA)" <ji...@apache.org> on 2014/09/11 22:29:33 UTC

[jira] [Commented] (PIG-4166) Collected group drops last record when combined with merge join

    [ https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14130654#comment-14130654 ] 

Brian Johnson commented on PIG-4166:
------------------------------------

This probably affects the final record no matter what, but I haven't tested that yet.

> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
>                 Key: PIG-4166
>                 URL: https://issues.apache.org/jira/browse/PIG-4166
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Brian Johnson
>
> If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
> {code}
> diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
>      @Override
>      public Result getNextTuple() throws ExecException {
>  
> -        // Since the output is buffered, we need to flush the last
> -        // set of records when the close method is called by mapper.
> -        if (this.parentPlan.endOfAllInput) {
> -            if (outputBag != null) {
> -                Tuple tup = mTupleFactory.newTuple(2);
> -                tup.set(0, prevKey);
> -                tup.set(1, outputBag);
> -                outputBag = null;
> -                return new Result(POStatus.STATUS_OK, tup);
> -            }
> -
> -            return new Result(POStatus.STATUS_EOP, null);
> -        }
> +        
>  
>          Result inp = null;
>          Result res = null;
>  
>          while (true) {
>              inp = processInput();
> +
>              if (inp.returnStatus == POStatus.STATUS_EOP ||
>                      inp.returnStatus == POStatus.STATUS_ERR) {
> -                break;
> +               // Since the output is buffered, we need to flush the last
> +                // set of records when the close method is called by mapper.
> +                if (this.parentPlan.endOfAllInput) {
> +                    if (outputBag != null) {
> +                        Tuple tup = mTupleFactory.newTuple(2);
> +                        tup.set(0, prevKey);
> +                        tup.set(1, outputBag);
> +                        outputBag = null;
> +                        return new Result(POStatus.STATUS_OK, tup);
> +                    }
> +
> +                    return new Result(POStatus.STATUS_EOP, null);
> +                } else
> +                       break;
>              }
>  
>              if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)