You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Pradeep Kamath (JIRA)" <ji...@apache.org> on 2009/05/12 20:19:45 UTC

[jira] Created: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
------------------------------------------------------------------------------------------------

                 Key: PIG-807
                 URL: https://issues.apache.org/jira/browse/PIG-807
             Project: Pig
          Issue Type: Improvement
    Affects Versions: 0.2.1
            Reporter: Pradeep Kamath
             Fix For: 0.3.0


Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Yiping Han (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12710818#action_12710818 ] 

Yiping Han commented on PIG-807:
--------------------------------

David, the syntax: B = foreach A generate SUM(m), is confusing for both developers and the parser.

I like the idea to remove the explicit GROUP ALL, but would rather to use a different key word for that. I.e., B = FOR A GENERATE SUM(m);

Adding a new keyword for this purpose would also works as the hint for parser to treat this as a direct hadoop iterator access.

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "David Ciemiewicz (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12709007#action_12709007 ] 

David Ciemiewicz commented on PIG-807:
--------------------------------------

Certainly SUM, COUNT, AVG could all use this.

In fact, technically speaking, SUM, COUNT, and AVG shouldn't even necessarily need a prior "GROUP ... ALL" statement.  How would this factor into the thinking on this?

While you're thinking about this, we might also consider another optimization as well ... what if I have 10 to 100 SUM operations in the same FOREACH ... GENERATE statement.

Materializing a DataBag or even a ReadOnce Bag for each column of data is REALLY slow.  In working through this, providing access to the underlying hadoop iterators permit a single scan through the data rather than multiple scans, one for each column?

Example:

{code}
A = load ...

B = group A all;

C = foreach B generate
COUNT(A),
SUM(A.m),
SUM(A.n),
SUM(A.o),
SUM(A.p),
SUM(A.q),
SUM(A.r),
...
{code}

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Yiping Han (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12708575#action_12708575 ] 

Yiping Han commented on PIG-807:
--------------------------------

I would say instead of annotating the UDF to indicate ""read once" bags, it would be easier to do that in the co-group command. We would skip bag materialization only if it is accessed by UDFs that ALL read it in the "read once" manner. Thus we only need to specify that once. 


> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Olga Natkovich (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Olga Natkovich updated PIG-807:
-------------------------------

    Fix Version/s: 0.6.0
         Assignee: Ying He

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>            Assignee: Ying He
>             Fix For: 0.6.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Resolved: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Olga Natkovich (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Olga Natkovich resolved PIG-807.
--------------------------------

    Resolution: Won't Fix

accumulator interface has been introduced for UDFs to solve this issue

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "David Ciemiewicz (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12714231#action_12714231 ] 

David Ciemiewicz commented on PIG-807:
--------------------------------------

I wonder if there is also the need for some additional classes of functions that go along with ReadOnce / Streaming applications:
Accumulating Functions that operated on ordered data and output a tuple for each and every tuple read.

For instance, cummulative sums, rank, dense rank, cumulative proportions all could be written Accumulating Functions that operate on streams.

>From my Perl example above, cummulative sum would be a function that does:

{code}
sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;

        return $self->{'sum'};
}
{code}

These kinds of functions would be different from the SUM, COUNT, MIN, MAX, .. Accumulating functions.

I think that any designs / redesigns of Pig to support ReadOnce data should also include consideration for these kinds of cumulative sum type functions as well.

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Mridul Muralidharan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12710901#action_12710901 ] 

Mridul Muralidharan commented on PIG-807:
-----------------------------------------

I think I am missing something here.


If I did not get it wrong, two (different ?) usecases seem to be mentioned here ?


1) Avoid materializing bag's for a record when it can be streamed from the underlying data.
bag's currently created through (co)group output seems to fall inside this.
As in :
B = GROUP A by id;
C = FOREACH B generate SUM($1.field);

This does not reqiure the $1.field bag to be created explicitly - but through an iterator interface, just stream the values from underlying reducer output.

2) The group ALL based construct seem to be to directly stream an entire relation through udf's.
As a shorthand for 
A_tmp = GROUP A all;
B = FOREACH A_tmp GENERATE algUdf($1);



If I am right in splitting this, then :

First usecase has tremendous potential for improving performance - particularly to remove the annoying OOM's or spills which happen : but not sure how it interact with pig's current pipeline design... (if any).


Since there are alternatives (though more cryptic) to do it, I dont have any particular opinion about 2.

Regards,
Mridul

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "David Ciemiewicz (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12714227#action_12714227 ] 

David Ciemiewicz commented on PIG-807:
--------------------------------------

@Yiping

I see what you mean.  Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE SUM(m);

Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);


There was a hallway conversation about the situation of:

{code}
B = GROUP A BY key;
C = FOREACH B {
        SORTED = ORDER A BY value;
        GENERATE
                COUNT(SORTED) as count,
                QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50, p75, p90, p100);
        };
{code}

I was told that a ReadOnce bag would not solve this problem because we'd need to pass through SORTED twice because there were two UDFs.

I disagree.  It is possible to pass over this data once and only once if we create a class of Accumulating or Running functions that differs from the current DataBag and AlgebraicDataBag functions.

First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY, can all computed on a ReadOnce / Streaming DataBag of unknown length or size.  For each of these functions, we simply "add" or "accumulate"  the values on row at a time, we can invoke a combiner for intermediate results across partitions, and produce a final result, all without materializing a DataBag as implemented today.

QUANTILES is a different beast.  To compute quantiles, the data must be sorted, which I prefer to do outside the UDF at this time.  Also, the COUNT of the data is needed a prior.  Fortunately sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length or size so only two scans through the data (sorting and quantiles) are needed without needing three scans (sort, count, quantiles).

So, if Pig could understand two additional data types

ReadOnceSizeUnknown -- COUNT() counts all individual rows
ReadOnceSizeKnown -- COUNT() just returns size attribute of ReadOnce data reference

And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate values a row at a time, many computations in Pig could be much much more efficient.

In case anyone doesn't "get" what I mean by having running functions, here's some Perl code that implements what I'm suggesting. I'll leave it as an exercise for the Pig development team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)

runningsums.pl
{code}
#! /usr/bin/perl

use RunningSum;
use RunningCount;

$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();

while (<>)
{
        s/\r*\n*//g;

        ($a, $b, $c) = split(/\t/);

        $a_count->accumulate($a);
        $a_sum->accumulate($a);
        $b_sum->accumulate($b);
        $c_sum->accumulate($c);
}

print join("\t",
        $a_count->final(),
        $a_sum->final(),
        $b_sum->final(),
        $c_sum->final()
        ), "\n";
{code}

RunningCount.pm
{code}
package RunningCount;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'count'} ++;
}

sub final
{
        my $self = shift;
        return $self->{'count'};
}

1;
{code}

RunningSum.pl
{code}
package RunningSum;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;
}

sub final
{
        my $self = shift;
        return $self->{'sum'};
}

1;
{code}








> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "Bee-Chung Chen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12709152#action_12709152 ] 

Bee-Chung Chen commented on PIG-807:
------------------------------------

It would be great if Pig can support ReadOnce bags.

It looks like the current memory mangement (the spill mechanism) depends highly on JVM which seems to be unstable especially when it gets close to the memory limit.  Is there any plan to implement a more rigorous buffer manger like those in database engines?


> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "David Ciemiewicz (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12710798#action_12710798 ] 

David Ciemiewicz commented on PIG-807:
--------------------------------------

Going back to my early days of Pig and the confusion / aversion people have to "group all", if we head ReadOnce bags, there would often be absolutely no need for "group all" or any form of mapping step. 

In other words, it would really be useful if people could just write the following summary code without using an intervening group ... all.

It would make it easier for people to transition from SQL to Pig.  It make Pig a little bit higher level language than it is.

{code} 
A = load ... 

B = foreach A generate 
COUNT(*), 
SUM(m), 
SUM(n), 
SUM(o), 
SUM(p). 
SUM(q), 
SUM(r), 
... 
{code} 

> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Issue Comment Edited: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)

Posted by "David Ciemiewicz (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12714227#action_12714227 ] 

David Ciemiewicz edited comment on PIG-807 at 5/28/09 4:36 PM:
---------------------------------------------------------------

@Yiping

I see what you mean.  Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE SUM(m);

Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);


There was a hallway conversation about the situation of:

{code}
B = GROUP A BY key;
C = FOREACH B {
        SORTED = ORDER A BY value;
        GENERATE
                COUNT(SORTED) as count,
                QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50, p75, p90, p100);
        };
{code}

I was told that a ReadOnce bag would not solve this problem because we'd need to pass through SORTED twice because there were two UDFs.

I disagree.  It is possible to pass over this data once and only once if we create a class of Accumulating or Running functions that differs from the current DataBag Eval and AlgebraicEval functions.

First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY, can all computed on a ReadOnce / Streaming DataBag of unknown length or size.  For each of these functions, we simply "add" or "accumulate"  the values on row at a time, we can invoke a combiner for intermediate results across partitions, and produce a final result, all without materializing a DataBag as implemented today.

QUANTILES is a different beast.  To compute quantiles, the data must be sorted, which I prefer to do outside the UDF at this time.  Also, the COUNT of the data is needed a prior.  Fortunately sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length or size so only two scans through the data (sorting and quantiles) are needed without needing three scans (sort, count, quantiles).

So, if Pig could understand two additional data types

ReadOnceSizeUnknown -- COUNT() counts all individual rows
ReadOnceSizeKnown -- COUNT() just returns size attribute of ReadOnce data reference

And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate values a row at a time, many computations in Pig could be much much more efficient.

In case anyone doesn't "get" what I mean by having running functions, here's some Perl code that implements what I'm suggesting. I'll leave it as an exercise for the Pig development team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)

runningsums.pl
{code}
#! /usr/bin/perl

use RunningSum;
use RunningCount;

$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();

while (<>)
{
        s/\r*\n*//g;

        ($a, $b, $c) = split(/\t/);

        $a_count->accumulate($a);
        $a_sum->accumulate($a);
        $b_sum->accumulate($b);
        $c_sum->accumulate($c);
}

print join("\t",
        $a_count->final(),
        $a_sum->final(),
        $b_sum->final(),
        $c_sum->final()
        ), "\n";
{code}

RunningCount.pm
{code}
package RunningCount;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'count'} ++;
}

sub final
{
        my $self = shift;
        return $self->{'count'};
}

1;
{code}

RunningSum.pl
{code}
package RunningSum;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;
}

sub final
{
        my $self = shift;
        return $self->{'sum'};
}

1;
{code}








      was (Author: ciemo):
    @Yiping

I see what you mean.  Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE SUM(m);

Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);


There was a hallway conversation about the situation of:

{code}
B = GROUP A BY key;
C = FOREACH B {
        SORTED = ORDER A BY value;
        GENERATE
                COUNT(SORTED) as count,
                QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50, p75, p90, p100);
        };
{code}

I was told that a ReadOnce bag would not solve this problem because we'd need to pass through SORTED twice because there were two UDFs.

I disagree.  It is possible to pass over this data once and only once if we create a class of Accumulating or Running functions that differs from the current DataBag and AlgebraicDataBag functions.

First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY, can all computed on a ReadOnce / Streaming DataBag of unknown length or size.  For each of these functions, we simply "add" or "accumulate"  the values on row at a time, we can invoke a combiner for intermediate results across partitions, and produce a final result, all without materializing a DataBag as implemented today.

QUANTILES is a different beast.  To compute quantiles, the data must be sorted, which I prefer to do outside the UDF at this time.  Also, the COUNT of the data is needed a prior.  Fortunately sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length or size so only two scans through the data (sorting and quantiles) are needed without needing three scans (sort, count, quantiles).

So, if Pig could understand two additional data types

ReadOnceSizeUnknown -- COUNT() counts all individual rows
ReadOnceSizeKnown -- COUNT() just returns size attribute of ReadOnce data reference

And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate values a row at a time, many computations in Pig could be much much more efficient.

In case anyone doesn't "get" what I mean by having running functions, here's some Perl code that implements what I'm suggesting. I'll leave it as an exercise for the Pig development team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)

runningsums.pl
{code}
#! /usr/bin/perl

use RunningSum;
use RunningCount;

$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();

while (<>)
{
        s/\r*\n*//g;

        ($a, $b, $c) = split(/\t/);

        $a_count->accumulate($a);
        $a_sum->accumulate($a);
        $b_sum->accumulate($b);
        $c_sum->accumulate($c);
}

print join("\t",
        $a_count->final(),
        $a_sum->final(),
        $b_sum->final(),
        $c_sum->final()
        ), "\n";
{code}

RunningCount.pm
{code}
package RunningCount;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'count'} ++;
}

sub final
{
        my $self = shift;
        return $self->{'count'};
}

1;
{code}

RunningSum.pl
{code}
package RunningSum;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;
}

sub final
{
        my $self = shift;
        return $self->{'sum'};
}

1;
{code}







  
> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing all of the contents. The issue with this is that if a particular key has many corresponding values, all these values get stuffed in a bag which may run out of memory and hence spill causing slow down in performance and sometime memory exceptions. In many cases, the udfs which use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional read-once manner. This can be implemented by having the bag implement its iterator by simply iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for this issue too. The other part of this issue is to have some way for the udfs to communicate to Pig that any input bags that they need are "read once" bags . This can be achieved by having an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.