You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Pradeep Kamath (JIRA)" <ji...@apache.org> on 2009/01/30 02:48:59 UTC

[jira] Updated: (PIG-645) Streaming is broken with the latest trunk

     [ https://issues.apache.org/jira/browse/PIG-645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pradeep Kamath updated PIG-645:
-------------------------------

        Fix Version/s: types_branch
    Affects Version/s: types_branch
               Status: Patch Available  (was: Open)

Attached patch to fix the issues with streaming. The root cause of the issue was the changes introduced by PIG-629 (PERFORMANCE: Eliminate use of TargetedTuple for each input tuple in the map()) caused a race condition on the input tuple in the map() between recordReader.next(Tuple value) and the streaming binary.

BEFORE PIG-629, the flow of a tuple from record reader to map was as follows:
The recordReader instance gets the *same* TargetedTuple object reference in every next(TargetedTuple value) call (this is because Hadoop reuses the value object for each recordReader.next(value) call). The recordReader.next(value) call inturn calls PigSlice.next(Tuple value) which has the following implementation:
{code}
public boolean next(Tuple value) throws IOException {
        Tuple t = loader.getNext();
        if (t == null) {
            return false;
        }
        value.reference(t);
        return true;
    }
{code}
Here value.reference(t) calls the TargetedTuple.reference(Tuple) method which simply stores the supplied the tuple in its member Tuple variable "t". 

In PigMapBase.map(), the toTuple() method on the input TargetedTuple is called which returns the above store tuple reference "t". This reference is then attached to the roots of the map plan. 

The point to note is this final tuple reference which is used by the operators in the map plan is the reference to the tuple returned from the loader and not the reference to the TargetedTuple which we get from the recordReader and which is supplied as an argument to the map() call. The loader creates a new tuple reference on each getNext(). This guarantees that the operators in the map plan always work with a differnt tuple reference on each map() call though the TargetedTuple reference supplied in the map() is the same and reused by Hadoop.

AFTER PIG-629, the flow changed as follows:
TargetedTuple was removed and Tuple was used instead. The PigSlice.next(Tuple value) code remained intact. However DefaultTuple.reference(Tuple) call in it assigns the internal mFields arraylist to the arraylist of the supplied tuple. Note that here the internal member arraylist of the DefaultTuple is changed to "refer" to the internal arraylist of the Tuple the loader gives. 
In map(), the tuple which is supplied as input argument to the map() call is attached directly to the roots. So in the case of streaming, this tuple is finally supplied to the binary by using a storage function (PigStorage by default). However this tuple refernce is the same as the one which gets reused by hadoop in the next recordReader.next(value) call. So while the storage function is in the process of writing the current Tuple's contents (the mFields arraylist), it can get changed underneath due to recordReader.next(value) call. So unless the storage functions writes to the binary's stdin BEFORE the next recordReader.next(value) call, the input sent to the Binary will be garbled.

The fix is the following one line change:
{noformat}
         for (PhysicalOperator root : roots) {
-            root.attachInput(inpTuple);
+            root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
         }

{noformat}

In map(), instead of attaching the inpTuple directly to the roots of the plan, a new Tuple is created which refers to the same mFields arrayList as in inpTuple. With this change, all operators in the map plan, now work on a different Tuple reference from the one which is supplied in the map() argument (and which is reused by Hadoop). This reference will refer to the mFields of the Tuple returned from the loader which is guaranteed to be a new arraylist for each input tuple since the loader creates a new Tuple each time. 

> Streaming is broken with the latest trunk
> -----------------------------------------
>
>                 Key: PIG-645
>                 URL: https://issues.apache.org/jira/browse/PIG-645
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: types_branch
>            Reporter: Olga Natkovich
>            Assignee: Pradeep Kamath
>             Fix For: types_branch
>
>         Attachments: PIG-645.patch
>
>
> Several tests we run are failing now

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.