You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Daan Gerits <da...@gmail.com> on 2011/11/09 14:34:23 UTC

Multithreaded UDF

Hello,

First of all, great job creating pig, really a magnificent piece of software.

I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.

This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)

Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.

An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.

Best Regards,

Daan

Re: Multithreaded UDF

Posted by Alan Gates <ga...@hortonworks.com>.
On Nov 9, 2011, at 12:52 PM, Jonathan Coveney wrote:

> Alan, do you mean overriding finalize(), or does EvalFunc have a finish()
> that I didn't realize existed?

EvalFunc has a finish() method which is called by Mapper's or Reducer's cleanup method.

Alan.


Re: Multithreaded UDF

Posted by Jonathan Coveney <jc...@gmail.com>.
Alan, do you mean overriding finalize(), or does EvalFunc have a finish()
that I didn't realize existed?

Daan: I think at this point it might be worth stepping back and explaining
a bit more about what you want to do. At this point, you want to fetch
urls, and then "do work." Is fetching url the only io that the evalfuncs
have to do? How long does fetching a URL typically take? What sort of work
do you want to do based on the url? And lastly, what specifically is
happening that you think multithreaded would be good? Is there blocking io?

2011/11/9 Mridul Muralidharan <mr...@yahoo-inc.com>

>
> A simple solution would be to tag each tuple with a random number (such
> that each number has multiple url's associated with it - but not too large
> a number of urls), and simply group based on this field.
> In the reducer, you get a bag of url's for each random number : at which
> point, you can use multiple threads to fetch content and associate their
> responses with the appropriate input tuple.
>
>
> You only need to ensure that :
> a) Too many tuples dont get associated with a single random number (to the
> extent that it causes spills to disk).
>
> b) Too few tuples dont get associated over all random numbers you use -
> else it degenerates to current case.
>
> c) You seed the random number sensible, in order not to hit problems with
> having your tasks being non-repeatable.
>
> Regards,
> Mridul
>
>
> On Wednesday 09 November 2011 07:04 PM, Daan Gerits wrote:
>
>> Hello,
>>
>> First of all, great job creating pig, really a magnificent piece of
>> software.
>>
>> I do have a few questions about UDFs. I have a dataset with a list of
>> url's I want to fetch. Since an EvalFunc can only process one tuple at a
>> time and the asynchronous abilities of the UDF are deprecated, I can only
>> fetch one url at a time. The problem is that fetching this one url takes a
>> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
>> that really slows down the processing. I already converted the UDF into an
>> Accumulator but that only seems to get fired after a group by. If would be
>> nice to have some kind of Queue UDF which will queue the tuples until a
>> certain amount is reached and than flushes the queue. That way I can add
>> tuples to an internal list and on flush start multiple threads to go
>> through the list of Tuples.
>>
>> This is a workaround though, since the best solution would be to
>> reintroduce the asynchronous UDF's (in which case I can schedule the
>> threads as the tuples come in)
>>
>> Any idea's on this? I already saw someone trying almost the same thing,
>> but didn't get a definite answer from that one.
>>
>> An other option is to increase the number of reducer slots on the
>> cluster, but I'm afraid that would mean we overload the nodes in case of a
>> heavy reduce phase.
>>
>> Best Regards,
>>
>> Daan
>>
>
>

Re: Multithreaded UDF

Posted by Raghu Angadi <an...@gmail.com>.
oh, this is much better than custom loader hack I mentioned to batch up
input tuples.

On Wed, Nov 9, 2011 at 12:22 PM, Mridul Muralidharan
<mr...@yahoo-inc.com>wrote:

>
> A simple solution would be to tag each tuple with a random number (such
> that each number has multiple url's associated with it - but not too large
> a number of urls), and simply group based on this field.
> In the reducer, you get a bag of url's for each random number : at which
> point, you can use multiple threads to fetch content and associate their
> responses with the appropriate input tuple.
>
>
> You only need to ensure that :
> a) Too many tuples dont get associated with a single random number (to the
> extent that it causes spills to disk).
>
> b) Too few tuples dont get associated over all random numbers you use -
> else it degenerates to current case.
>
> c) You seed the random number sensible, in order not to hit problems with
> having your tasks being non-repeatable.
>
> Regards,
> Mridul
>
>
> On Wednesday 09 November 2011 07:04 PM, Daan Gerits wrote:
>
>> Hello,
>>
>> First of all, great job creating pig, really a magnificent piece of
>> software.
>>
>> I do have a few questions about UDFs. I have a dataset with a list of
>> url's I want to fetch. Since an EvalFunc can only process one tuple at a
>> time and the asynchronous abilities of the UDF are deprecated, I can only
>> fetch one url at a time. The problem is that fetching this one url takes a
>> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
>> that really slows down the processing. I already converted the UDF into an
>> Accumulator but that only seems to get fired after a group by. If would be
>> nice to have some kind of Queue UDF which will queue the tuples until a
>> certain amount is reached and than flushes the queue. That way I can add
>> tuples to an internal list and on flush start multiple threads to go
>> through the list of Tuples.
>>
>> This is a workaround though, since the best solution would be to
>> reintroduce the asynchronous UDF's (in which case I can schedule the
>> threads as the tuples come in)
>>
>> Any idea's on this? I already saw someone trying almost the same thing,
>> but didn't get a definite answer from that one.
>>
>> An other option is to increase the number of reducer slots on the
>> cluster, but I'm afraid that would mean we overload the nodes in case of a
>> heavy reduce phase.
>>
>> Best Regards,
>>
>> Daan
>>
>
>

Re: Multithreaded UDF

Posted by Jonathan Coveney <jc...@gmail.com>.
I love the TupleFuture idea. I wonder how hard it would be to do if you
could defer evaluation until serialization (maybe it serializes instantly?)

2011/11/9 Dmitriy Ryaboy <dv...@gmail.com>

> Could extend Tuple and return a TupleFuture object.
> I am guessing it'll just get evaluated immediately by the next operator and
> not actually gain you anything.
> It'd be neat to be able to do that sort of thing though.
> D
>
> On Wed, Nov 9, 2011 at 12:22 PM, Mridul Muralidharan
> <mr...@yahoo-inc.com>wrote:
>
> >
> > A simple solution would be to tag each tuple with a random number (such
> > that each number has multiple url's associated with it - but not too
> large
> > a number of urls), and simply group based on this field.
> > In the reducer, you get a bag of url's for each random number : at which
> > point, you can use multiple threads to fetch content and associate their
> > responses with the appropriate input tuple.
> >
> >
> > You only need to ensure that :
> > a) Too many tuples dont get associated with a single random number (to
> the
> > extent that it causes spills to disk).
> >
> > b) Too few tuples dont get associated over all random numbers you use -
> > else it degenerates to current case.
> >
> > c) You seed the random number sensible, in order not to hit problems with
> > having your tasks being non-repeatable.
> >
> > Regards,
> > Mridul
> >
> >
> > On Wednesday 09 November 2011 07:04 PM, Daan Gerits wrote:
> >
> >> Hello,
> >>
> >> First of all, great job creating pig, really a magnificent piece of
> >> software.
> >>
> >> I do have a few questions about UDFs. I have a dataset with a list of
> >> url's I want to fetch. Since an EvalFunc can only process one tuple at a
> >> time and the asynchronous abilities of the UDF are deprecated, I can
> only
> >> fetch one url at a time. The problem is that fetching this one url
> takes a
> >> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> >> that really slows down the processing. I already converted the UDF into
> an
> >> Accumulator but that only seems to get fired after a group by. If would
> be
> >> nice to have some kind of Queue UDF which will queue the tuples until a
> >> certain amount is reached and than flushes the queue. That way I can add
> >> tuples to an internal list and on flush start multiple threads to go
> >> through the list of Tuples.
> >>
> >> This is a workaround though, since the best solution would be to
> >> reintroduce the asynchronous UDF's (in which case I can schedule the
> >> threads as the tuples come in)
> >>
> >> Any idea's on this? I already saw someone trying almost the same thing,
> >> but didn't get a definite answer from that one.
> >>
> >> An other option is to increase the number of reducer slots on the
> >> cluster, but I'm afraid that would mean we overload the nodes in case
> of a
> >> heavy reduce phase.
> >>
> >> Best Regards,
> >>
> >> Daan
> >>
> >
> >
>

Re: Multithreaded UDF

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Could extend Tuple and return a TupleFuture object.
I am guessing it'll just get evaluated immediately by the next operator and
not actually gain you anything.
It'd be neat to be able to do that sort of thing though.
D

On Wed, Nov 9, 2011 at 12:22 PM, Mridul Muralidharan
<mr...@yahoo-inc.com>wrote:

>
> A simple solution would be to tag each tuple with a random number (such
> that each number has multiple url's associated with it - but not too large
> a number of urls), and simply group based on this field.
> In the reducer, you get a bag of url's for each random number : at which
> point, you can use multiple threads to fetch content and associate their
> responses with the appropriate input tuple.
>
>
> You only need to ensure that :
> a) Too many tuples dont get associated with a single random number (to the
> extent that it causes spills to disk).
>
> b) Too few tuples dont get associated over all random numbers you use -
> else it degenerates to current case.
>
> c) You seed the random number sensible, in order not to hit problems with
> having your tasks being non-repeatable.
>
> Regards,
> Mridul
>
>
> On Wednesday 09 November 2011 07:04 PM, Daan Gerits wrote:
>
>> Hello,
>>
>> First of all, great job creating pig, really a magnificent piece of
>> software.
>>
>> I do have a few questions about UDFs. I have a dataset with a list of
>> url's I want to fetch. Since an EvalFunc can only process one tuple at a
>> time and the asynchronous abilities of the UDF are deprecated, I can only
>> fetch one url at a time. The problem is that fetching this one url takes a
>> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
>> that really slows down the processing. I already converted the UDF into an
>> Accumulator but that only seems to get fired after a group by. If would be
>> nice to have some kind of Queue UDF which will queue the tuples until a
>> certain amount is reached and than flushes the queue. That way I can add
>> tuples to an internal list and on flush start multiple threads to go
>> through the list of Tuples.
>>
>> This is a workaround though, since the best solution would be to
>> reintroduce the asynchronous UDF's (in which case I can schedule the
>> threads as the tuples come in)
>>
>> Any idea's on this? I already saw someone trying almost the same thing,
>> but didn't get a definite answer from that one.
>>
>> An other option is to increase the number of reducer slots on the
>> cluster, but I'm afraid that would mean we overload the nodes in case of a
>> heavy reduce phase.
>>
>> Best Regards,
>>
>> Daan
>>
>
>

Re: Multithreaded UDF

Posted by Mridul Muralidharan <mr...@yahoo-inc.com>.
A simple solution would be to tag each tuple with a random number (such 
that each number has multiple url's associated with it - but not too 
large a number of urls), and simply group based on this field.
In the reducer, you get a bag of url's for each random number : at which 
point, you can use multiple threads to fetch content and associate their 
responses with the appropriate input tuple.


You only need to ensure that :
a) Too many tuples dont get associated with a single random number (to 
the extent that it causes spills to disk).

b) Too few tuples dont get associated over all random numbers you use - 
else it degenerates to current case.

c) You seed the random number sensible, in order not to hit problems 
with having your tasks being non-repeatable.

Regards,
Mridul

On Wednesday 09 November 2011 07:04 PM, Daan Gerits wrote:
> Hello,
>
> First of all, great job creating pig, really a magnificent piece of software.
>
> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
>
> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
>
> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
>
> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
>
> Best Regards,
>
> Daan


Re: Multithreaded UDF

Posted by Daan Gerits <da...@gmail.com>.
Hey everyone,

Thank you for the massive amount of feedback. Allow me to provide my own:

The 1 to 5 seconds is actually just idling, not doing anything actually except waiting for a random (between 1 and 5 secs) amount of time. It's part of some human behavior mimicking I do.

I was also thinking about streaming, but haven't looked into it yet since is seems to me to be pretty low-level, parsing stdin and posting to stdout. And as you say, I don't know how easy it would be to send urls and content through the streams without too much of a hassle. Maybe it's an option to use a serialization mechanism like avro or plain java serialization to pass the information. Anyway that's the backup plan, if all else fails :)

Actually my first idea was to create a loader to load the content from the web, but that seemed not very ideal since I want to use the udf not only during loading, but during a foreach. Maybe I'm getting this wrong, but you can't pass relations to the constructor of a load function? or can you?

As I'm wandering on, I have something which is nearly working:

queries         = FOREACH customers GENERATE id as key, url;
requests          = GROUP queries BY SUBSTRING(url, 0, 13);
fetchResults    = FOREACH requests {
                results = fetchHttp(queries);
                GENERATE FLATTEN(results);
}

results         = FOREACH fetchResults GENERATE now() as timestamp, url, FLATTEN(fetches);
cleanedResults  = FOREACH results GENERATE timestamp, url, page as page, duration as duration, removeWhiteSpace(content, 0) as content;

Any idea's on this approach?

Daan.

On 09 Nov 2011, at 19:54, Alan Gates wrote:

> Multi-threading of UDFs is not deprecated, it just isn't explicitly supported.  However, it should work.  The internal MonitoredUDF uses multiple threads.
> 
> Do you need to output records conditionally or modify the contents of the record based on the results of this http call?  If not, then you can place records in a queue as they go through and have a pool of worker threads doing the http calls in the background.  You can then use the finish() call to make sure your queue is empty and all your work threads finished.
> 
> The problem if you need to modify or remove records is that finish() doesn't let you return data.  So even though you could return a bag full of records you had finished for each record that came in (with some bags being empty, which a subsequent flatten could then remove), you would loose the last few records because you wouldn't get a chance to return them.  As suggested in a previous mail, streaming will do what you want in this case.
> 
> Alan.
> 
> 
> On Nov 9, 2011, at 5:34 AM, Daan Gerits wrote:
> 
>> Hello,
>> 
>> First of all, great job creating pig, really a magnificent piece of software.
>> 
>> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
>> 
>> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
>> 
>> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
>> 
>> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
>> 
>> Best Regards,
>> 
>> Daan
> 


Re: Multithreaded UDF

Posted by Alan Gates <ga...@hortonworks.com>.
Multi-threading of UDFs is not deprecated, it just isn't explicitly supported.  However, it should work.  The internal MonitoredUDF uses multiple threads.

Do you need to output records conditionally or modify the contents of the record based on the results of this http call?  If not, then you can place records in a queue as they go through and have a pool of worker threads doing the http calls in the background.  You can then use the finish() call to make sure your queue is empty and all your work threads finished.

The problem if you need to modify or remove records is that finish() doesn't let you return data.  So even though you could return a bag full of records you had finished for each record that came in (with some bags being empty, which a subsequent flatten could then remove), you would loose the last few records because you wouldn't get a chance to return them.  As suggested in a previous mail, streaming will do what you want in this case.

Alan.


On Nov 9, 2011, at 5:34 AM, Daan Gerits wrote:

> Hello,
> 
> First of all, great job creating pig, really a magnificent piece of software.
> 
> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
> 
> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
> 
> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
> 
> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
> 
> Best Regards,
> 
> Daan


Re: Multithreaded UDF

Posted by Raghu Angadi <an...@gmail.com>.
Assuming 1-5 seconds is mainly waiting for IO, using multiple reducers or
mapper might not be suitable since it just takes too many mapper an d
reducer slots. Couple of options:

1. use streaming : you have full control on how many you handle at a time.
Might be tricky to pass url content.

2. a hack: say you want handle 1000 urls at a time, write a simple loader
that extends PigStorage(), where getNext() looks something like :
 { DataBag bag = ...;
      for(int i=; i<1000; i++) {
         tuple = super.getNext();
         if (tuple == null) break;
         bag.add(tuple);
       }
       return bag.size() > 0 ? bag : null;
   }
and your UDF handles bag of tuples and returns a bag of tuples.

Raghu.
On Wed, Nov 9, 2011 at 9:12 AM, Jonathan Coveney <jc...@gmail.com> wrote:

> I don't get how this would be a win. Let's imagine you have a system that
> you're fully saturating with map tasks, such that you have, say, 50
> available cpus (after task tracker, job tracker, etc) and you send your job
> to 50 mappers...how is this different from 25 mappers with 2 threads
> apiece? I guess it depends on whether or not the 1 to 5 seconds that each
> task is spending blocking on some action. I guess you could enqueue all the
> URL fetches, and then have another thread process that. Either way, the
> semantics for such a UDF would be awkward and run counter to the typical
> m/r use case, imho. However, if you wanted to do something like this (and
> assuming that you want to avoid waiting a bunch for some blocking i/o),
> what you could do would be to make an accumulator UDF, but then do a group
> all. So you do:
>
> customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
>                   USING PigStorage(',')
>                   AS (id:chararray, name:chararray, url:chararray);
>
> fetchResults    = FOREACH customers
>                   GENERATE id, name, url, fetchHttp(url);
>
> fetchResults = foreach (group customers all) generate customers.id,
> customers.name, fetchHttp(customers.url);
>
> this would cause the accumulator to be invoked, and you could just enqueue
> the elements of the input bag that you get and fire up a thread that begins
> fetching, and then once it is empty, begin processing the results of the
> fetch.
>
> note: that's pure theory and I don't know if it would actually be
> performant, but you could do it :)
>
> if you're not waiting on a bunch of IO, though, I don't see the gain. If
> you have 1-5s of actual work to do per url (not just waiting on the results
> of some long operations), then making it asynchronous won't change that.
>
> 2011/11/9 Daan Gerits <da...@gmail.com>
>
> > I expect you are talking about the 1-5 second delay I talked about. What
> I
> > actually meant was that the code within the exec function of the UDF is
> > taking 1 to 5 seconds for each invocation. That's something I cannot
> change
> > since the fetch method is actually doing a lot more than only fetching
> > something. I cannot push the additional logic the fetching is invoking
> > higher since that would break the algorithm.
> >
> >
> > On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
> >
> > > Something is wrong with your calculations UDF, think of something,
> > because I had experience when I needed to calculate efficiency of data
> > sent/downloaded by user, the logic there was too complex and despite that
> > the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> > overall ~ 0.00004s per tx.
> > >
> > > Example of the code:
> > > userGroup = GROUP recordTx BY user PARALLEL 100;
> > > userFlattened = FOREACH userGroup {
> > >       generated = Merge(recordTx);
> > >       GENERATE FLATTEN(generated);
> > > }
> > >
> > >
> > > Sincerely,
> > > Marek M.
> > > ________________________________________
> > > From: Daan Gerits [daan.gerits@gmail.com]
> > > Sent: Wednesday, November 09, 2011 4:19 PM
> > > To: user@pig.apache.org
> > > Subject: Re: Multithreaded UDF
> > >
> > > Hi Marek,
> > >
> > > yes, I have:
> > >
> > > SET default_parallel 50;
> > >
> > > at the top of my script.
> > >
> > > The idea to use the udf is as follows:
> > >
> > > customers       = LOAD
> > 'hdfs://node1.c.foundation.local/data/customers.csv'
> > >                    USING PigStorage(',')
> > >                    AS (id:chararray, name:chararray, url:chararray);
> > >
> > > fetchResults    = FOREACH customers
> > >                    GENERATE id, name, url, fetchHttp(url);
> > >
> > > ending up with the following data structure:
> > > (id, name, url, {timestamp, content, fetchDuration})
> > >
> > > I am currently not yet using the group since I would like to find a
> > solution without first having to group everything. The workaround for me
> > would be to group everything on a field which I know is unique, that way
> I
> > won't loose the structure of the relation.
> > >
> > > Thanks for the quick reply,
> > >
> > > Daan
> > >
> > > On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> > >
> > >> Do you use parallels in the GROUP?
> > >> ________________________________________
> > >> From: Daan Gerits [daan.gerits@gmail.com]
> > >> Sent: Wednesday, November 09, 2011 3:34 PM
> > >> To: user@pig.apache.org
> > >> Subject: Multithreaded UDF
> > >>
> > >> Hello,
> > >>
> > >> First of all, great job creating pig, really a magnificent piece of
> > software.
> > >>
> > >> I do have a few questions about UDFs. I have a dataset with a list of
> > url's I want to fetch. Since an EvalFunc can only process one tuple at a
> > time and the asynchronous abilities of the UDF are deprecated, I can only
> > fetch one url at a time. The problem is that fetching this one url takes
> a
> > reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> > that really slows down the processing. I already converted the UDF into
> an
> > Accumulator but that only seems to get fired after a group by. If would
> be
> > nice to have some kind of Queue UDF which will queue the tuples until a
> > certain amount is reached and than flushes the queue. That way I can add
> > tuples to an internal list and on flush start multiple threads to go
> > through the list of Tuples.
> > >>
> > >> This is a workaround though, since the best solution would be to
> > reintroduce the asynchronous UDF's (in which case I can schedule the
> > threads as the tuples come in)
> > >>
> > >> Any idea's on this? I already saw someone trying almost the same
> thing,
> > but didn't get a definite answer from that one.
> > >>
> > >> An other option is to increase the number of reducer slots on the
> > cluster, but I'm afraid that would mean we overload the nodes in case of
> a
> > heavy reduce phase.
> > >>
> > >> Best Regards,
> > >>
> > >> Daan
> > >
> >
> >
>

Re: Multithreaded UDF

Posted by Jonathan Coveney <jc...@gmail.com>.
I don't get how this would be a win. Let's imagine you have a system that
you're fully saturating with map tasks, such that you have, say, 50
available cpus (after task tracker, job tracker, etc) and you send your job
to 50 mappers...how is this different from 25 mappers with 2 threads
apiece? I guess it depends on whether or not the 1 to 5 seconds that each
task is spending blocking on some action. I guess you could enqueue all the
URL fetches, and then have another thread process that. Either way, the
semantics for such a UDF would be awkward and run counter to the typical
m/r use case, imho. However, if you wanted to do something like this (and
assuming that you want to avoid waiting a bunch for some blocking i/o),
what you could do would be to make an accumulator UDF, but then do a group
all. So you do:

customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
                   USING PigStorage(',')
                   AS (id:chararray, name:chararray, url:chararray);

fetchResults    = FOREACH customers
                   GENERATE id, name, url, fetchHttp(url);

fetchResults = foreach (group customers all) generate customers.id,
customers.name, fetchHttp(customers.url);

this would cause the accumulator to be invoked, and you could just enqueue
the elements of the input bag that you get and fire up a thread that begins
fetching, and then once it is empty, begin processing the results of the
fetch.

note: that's pure theory and I don't know if it would actually be
performant, but you could do it :)

if you're not waiting on a bunch of IO, though, I don't see the gain. If
you have 1-5s of actual work to do per url (not just waiting on the results
of some long operations), then making it asynchronous won't change that.

2011/11/9 Daan Gerits <da...@gmail.com>

> I expect you are talking about the 1-5 second delay I talked about. What I
> actually meant was that the code within the exec function of the UDF is
> taking 1 to 5 seconds for each invocation. That's something I cannot change
> since the fetch method is actually doing a lot more than only fetching
> something. I cannot push the additional logic the fetching is invoking
> higher since that would break the algorithm.
>
>
> On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
>
> > Something is wrong with your calculations UDF, think of something,
> because I had experience when I needed to calculate efficiency of data
> sent/downloaded by user, the logic there was too complex and despite that
> the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> overall ~ 0.00004s per tx.
> >
> > Example of the code:
> > userGroup = GROUP recordTx BY user PARALLEL 100;
> > userFlattened = FOREACH userGroup {
> >       generated = Merge(recordTx);
> >       GENERATE FLATTEN(generated);
> > }
> >
> >
> > Sincerely,
> > Marek M.
> > ________________________________________
> > From: Daan Gerits [daan.gerits@gmail.com]
> > Sent: Wednesday, November 09, 2011 4:19 PM
> > To: user@pig.apache.org
> > Subject: Re: Multithreaded UDF
> >
> > Hi Marek,
> >
> > yes, I have:
> >
> > SET default_parallel 50;
> >
> > at the top of my script.
> >
> > The idea to use the udf is as follows:
> >
> > customers       = LOAD
> 'hdfs://node1.c.foundation.local/data/customers.csv'
> >                    USING PigStorage(',')
> >                    AS (id:chararray, name:chararray, url:chararray);
> >
> > fetchResults    = FOREACH customers
> >                    GENERATE id, name, url, fetchHttp(url);
> >
> > ending up with the following data structure:
> > (id, name, url, {timestamp, content, fetchDuration})
> >
> > I am currently not yet using the group since I would like to find a
> solution without first having to group everything. The workaround for me
> would be to group everything on a field which I know is unique, that way I
> won't loose the structure of the relation.
> >
> > Thanks for the quick reply,
> >
> > Daan
> >
> > On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> >
> >> Do you use parallels in the GROUP?
> >> ________________________________________
> >> From: Daan Gerits [daan.gerits@gmail.com]
> >> Sent: Wednesday, November 09, 2011 3:34 PM
> >> To: user@pig.apache.org
> >> Subject: Multithreaded UDF
> >>
> >> Hello,
> >>
> >> First of all, great job creating pig, really a magnificent piece of
> software.
> >>
> >> I do have a few questions about UDFs. I have a dataset with a list of
> url's I want to fetch. Since an EvalFunc can only process one tuple at a
> time and the asynchronous abilities of the UDF are deprecated, I can only
> fetch one url at a time. The problem is that fetching this one url takes a
> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> that really slows down the processing. I already converted the UDF into an
> Accumulator but that only seems to get fired after a group by. If would be
> nice to have some kind of Queue UDF which will queue the tuples until a
> certain amount is reached and than flushes the queue. That way I can add
> tuples to an internal list and on flush start multiple threads to go
> through the list of Tuples.
> >>
> >> This is a workaround though, since the best solution would be to
> reintroduce the asynchronous UDF's (in which case I can schedule the
> threads as the tuples come in)
> >>
> >> Any idea's on this? I already saw someone trying almost the same thing,
> but didn't get a definite answer from that one.
> >>
> >> An other option is to increase the number of reducer slots on the
> cluster, but I'm afraid that would mean we overload the nodes in case of a
> heavy reduce phase.
> >>
> >> Best Regards,
> >>
> >> Daan
> >
>
>

Re: Multithreaded UDF

Posted by Daan Gerits <da...@gmail.com>.
I expect you are talking about the 1-5 second delay I talked about. What I actually meant was that the code within the exec function of the UDF is taking 1 to 5 seconds for each invocation. That's something I cannot change since the fetch method is actually doing a lot more than only fetching something. I cannot push the additional logic the fetching is invoking higher since that would break the algorithm.


On 09 Nov 2011, at 16:05, Marek Miglinski wrote:

> Something is wrong with your calculations UDF, think of something, because I had experience when I needed to calculate efficiency of data sent/downloaded by user, the logic there was too complex and despite that the speed was ~ 0.02s per user which had ~ 500 transactions each, so in overall ~ 0.00004s per tx.
> 
> Example of the code:
> userGroup = GROUP recordTx BY user PARALLEL 100;
> userFlattened = FOREACH userGroup {
> 	generated = Merge(recordTx);
> 	GENERATE FLATTEN(generated); 
> }
> 
> 
> Sincerely,
> Marek M.
> ________________________________________
> From: Daan Gerits [daan.gerits@gmail.com]
> Sent: Wednesday, November 09, 2011 4:19 PM
> To: user@pig.apache.org
> Subject: Re: Multithreaded UDF
> 
> Hi Marek,
> 
> yes, I have:
> 
> SET default_parallel 50;
> 
> at the top of my script.
> 
> The idea to use the udf is as follows:
> 
> customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
>                    USING PigStorage(',')
>                    AS (id:chararray, name:chararray, url:chararray);
> 
> fetchResults    = FOREACH customers
>                    GENERATE id, name, url, fetchHttp(url);
> 
> ending up with the following data structure:
> (id, name, url, {timestamp, content, fetchDuration})
> 
> I am currently not yet using the group since I would like to find a solution without first having to group everything. The workaround for me would be to group everything on a field which I know is unique, that way I won't loose the structure of the relation.
> 
> Thanks for the quick reply,
> 
> Daan
> 
> On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> 
>> Do you use parallels in the GROUP?
>> ________________________________________
>> From: Daan Gerits [daan.gerits@gmail.com]
>> Sent: Wednesday, November 09, 2011 3:34 PM
>> To: user@pig.apache.org
>> Subject: Multithreaded UDF
>> 
>> Hello,
>> 
>> First of all, great job creating pig, really a magnificent piece of software.
>> 
>> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
>> 
>> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
>> 
>> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
>> 
>> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
>> 
>> Best Regards,
>> 
>> Daan
> 


RE: Multithreaded UDF

Posted by Marek Miglinski <mm...@seven.com>.
Something is wrong with your calculations UDF, think of something, because I had experience when I needed to calculate efficiency of data sent/downloaded by user, the logic there was too complex and despite that the speed was ~ 0.02s per user which had ~ 500 transactions each, so in overall ~ 0.00004s per tx.

Example of the code:
userGroup = GROUP recordTx BY user PARALLEL 100;
userFlattened = FOREACH userGroup {
	generated = Merge(recordTx);
	GENERATE FLATTEN(generated); 
}


Sincerely,
Marek M.
________________________________________
From: Daan Gerits [daan.gerits@gmail.com]
Sent: Wednesday, November 09, 2011 4:19 PM
To: user@pig.apache.org
Subject: Re: Multithreaded UDF

Hi Marek,

yes, I have:

SET default_parallel 50;

at the top of my script.

The idea to use the udf is as follows:

customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
                    USING PigStorage(',')
                    AS (id:chararray, name:chararray, url:chararray);

fetchResults    = FOREACH customers
                    GENERATE id, name, url, fetchHttp(url);

ending up with the following data structure:
(id, name, url, {timestamp, content, fetchDuration})

I am currently not yet using the group since I would like to find a solution without first having to group everything. The workaround for me would be to group everything on a field which I know is unique, that way I won't loose the structure of the relation.

Thanks for the quick reply,

Daan

On 09 Nov 2011, at 15:12, Marek Miglinski wrote:

> Do you use parallels in the GROUP?
> ________________________________________
> From: Daan Gerits [daan.gerits@gmail.com]
> Sent: Wednesday, November 09, 2011 3:34 PM
> To: user@pig.apache.org
> Subject: Multithreaded UDF
>
> Hello,
>
> First of all, great job creating pig, really a magnificent piece of software.
>
> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
>
> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
>
> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
>
> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
>
> Best Regards,
>
> Daan


Re: Multithreaded UDF

Posted by Daan Gerits <da...@gmail.com>.
Hi Marek,

yes, I have:

SET default_parallel 50;

at the top of my script.

The idea to use the udf is as follows:

customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
                    USING PigStorage(',')
                    AS (id:chararray, name:chararray, url:chararray);

fetchResults    = FOREACH customers
                    GENERATE id, name, url, fetchHttp(url);

ending up with the following data structure:
(id, name, url, {timestamp, content, fetchDuration})

I am currently not yet using the group since I would like to find a solution without first having to group everything. The workaround for me would be to group everything on a field which I know is unique, that way I won't loose the structure of the relation.

Thanks for the quick reply,

Daan

On 09 Nov 2011, at 15:12, Marek Miglinski wrote:

> Do you use parallels in the GROUP?
> ________________________________________
> From: Daan Gerits [daan.gerits@gmail.com]
> Sent: Wednesday, November 09, 2011 3:34 PM
> To: user@pig.apache.org
> Subject: Multithreaded UDF
> 
> Hello,
> 
> First of all, great job creating pig, really a magnificent piece of software.
> 
> I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.
> 
> This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)
> 
> Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.
> 
> An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.
> 
> Best Regards,
> 
> Daan


RE: Multithreaded UDF

Posted by Marek Miglinski <mm...@seven.com>.
Do you use parallels in the GROUP?
________________________________________
From: Daan Gerits [daan.gerits@gmail.com]
Sent: Wednesday, November 09, 2011 3:34 PM
To: user@pig.apache.org
Subject: Multithreaded UDF

Hello,

First of all, great job creating pig, really a magnificent piece of software.

I do have a few questions about UDFs. I have a dataset with a list of url's I want to fetch. Since an EvalFunc can only process one tuple at a time and the asynchronous abilities of the UDF are deprecated, I can only fetch one url at a time. The problem is that fetching this one url takes a reasonable amount of time (1 to 5 seconds, there is a delay built in) so that really slows down the processing. I already converted the UDF into an Accumulator but that only seems to get fired after a group by. If would be nice to have some kind of Queue UDF which will queue the tuples until a certain amount is reached and than flushes the queue. That way I can add tuples to an internal list and on flush start multiple threads to go through the list of Tuples.

This is a workaround though, since the best solution would be to reintroduce the asynchronous UDF's (in which case I can schedule the threads as the tuples come in)

Any idea's on this? I already saw someone trying almost the same thing, but didn't get a definite answer from that one.

An other option is to increase the number of reducer slots on the cluster, but I'm afraid that would mean we overload the nodes in case of a heavy reduce phase.

Best Regards,

Daan