You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Sandesh Devaraju <sa...@gmail.com> on 2010/04/14 01:16:14 UTC

Possible check for speculative execution cancellation in finish() of storage UDF

Hi All,

I am using PIG-1229 to write pig query output to a database. However,
I noticed that because of speculative execution, spurious records end
up being written.

I was wondering if there is a way to infer if current reduce task is
running in a speculative slot that was cancelled (and hence a rollback
needs to be issued).

Thanks in advance!

- Sandesh

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Option 3 comes up in a non-parallelized scenario when you are essentially
spinning out a Data Mart.

You need to do writes in parallel when you are inserting into something like
Vertica or Aster or Greenplum, which can sustain a high write volume by
maintaining many connections to many nodes, and you have a lot of data to
export -- perhaps you are using Pig as an extremely parallelizable ETL tool.

I can see Option 1 being useful when you know your output volume is very low
-- just some aggregate rows, on the order of a few hundred per reducer.

Which is probably very common, actually.

-Dmitriy

On Tue, Apr 13, 2010 at 6:03 PM, hc busy <hc...@gmail.com> wrote:

> Does anybody have comments on Option 1 versus Option 3?
>
> Personally I haven't ran into any case where anything other than 3 makes
> sense. Like, when do you need to parallel insert ?
>
>
>
> On Tue, Apr 13, 2010 at 4:49 PM, Dmitriy Ryaboy <dv...@gmail.com>
> wrote:
>
> > Option 1: write everything in a given mapper in one big transaction, roll
> > back if killed (this is obviously a performance killer)
> >
> > Option 2: on spin-up, the task creates a temporary table by copying the
> > definition from the main table; the allFinished() method, or whatever we
> > are
> > calling it now, moves data from the temp tables of successfull attempts
> > into
> > the main table. Also not awesome.
> >
> > Option 3: Write to fs, bulk import into a database at the end of your
> job.
> > Safest, sanest, most parallelizable. See dependency tools like the
> recently
> > open-sourced Azkaban for making life easier in that regard.
> >
> > -Dmitriy
> >
> > On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
> > ashutosh.chauhan@gmail.com> wrote:
> >
> > > Sandesh,
> > >
> > > As a workaround you can set the property
> > > mapred.[map|reduce].max.attempts to 1, which I believe will turn off
> > > speculative execution. You can pass this as -D switch on pig command
> > > line or through mapred-site.xml . Proper way to do it will be the way
> > > you suggested (though that will be less performant as well as  complex
> > > to implement). You may also want to comment on that jira with your
> > > issue.
> > >
> > > Ashutosh
> > >
> > > On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
> > > <sa...@gmail.com> wrote:
> > > > Hi All,
> > > >
> > > > I am using PIG-1229 to write pig query output to a database. However,
> > > > I noticed that because of speculative execution, spurious records end
> > > > up being written.
> > > >
> > > > I was wondering if there is a way to infer if current reduce task is
> > > > running in a speculative slot that was cancelled (and hence a
> rollback
> > > > needs to be issued).
> > > >
> > > > Thanks in advance!
> > > >
> > > > - Sandesh
> > > >
> > >
> >
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by hc busy <hc...@gmail.com>.
Does anybody have comments on Option 1 versus Option 3?

Personally I haven't ran into any case where anything other than 3 makes
sense. Like, when do you need to parallel insert ?



On Tue, Apr 13, 2010 at 4:49 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> Option 1: write everything in a given mapper in one big transaction, roll
> back if killed (this is obviously a performance killer)
>
> Option 2: on spin-up, the task creates a temporary table by copying the
> definition from the main table; the allFinished() method, or whatever we
> are
> calling it now, moves data from the temp tables of successfull attempts
> into
> the main table. Also not awesome.
>
> Option 3: Write to fs, bulk import into a database at the end of your job.
> Safest, sanest, most parallelizable. See dependency tools like the recently
> open-sourced Azkaban for making life easier in that regard.
>
> -Dmitriy
>
> On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
> ashutosh.chauhan@gmail.com> wrote:
>
> > Sandesh,
> >
> > As a workaround you can set the property
> > mapred.[map|reduce].max.attempts to 1, which I believe will turn off
> > speculative execution. You can pass this as -D switch on pig command
> > line or through mapred-site.xml . Proper way to do it will be the way
> > you suggested (though that will be less performant as well as  complex
> > to implement). You may also want to comment on that jira with your
> > issue.
> >
> > Ashutosh
> >
> > On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
> > <sa...@gmail.com> wrote:
> > > Hi All,
> > >
> > > I am using PIG-1229 to write pig query output to a database. However,
> > > I noticed that because of speculative execution, spurious records end
> > > up being written.
> > >
> > > I was wondering if there is a way to infer if current reduce task is
> > > running in a speculative slot that was cancelled (and hence a rollback
> > > needs to be issued).
> > >
> > > Thanks in advance!
> > >
> > > - Sandesh
> > >
> >
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
not if you are inserting with auto-increment...

-D

On Tue, Apr 13, 2010 at 5:14 PM, Ashutosh Chauhan <
ashutosh.chauhan@gmail.com> wrote:

> Sandesh,
>
> Which perf penalty you are trying to avoid? If you are writing same
> record from four different reducers (which will happen with S.E.
> turned on) you are only straining your DB.
>
> Ashutosh
> On Tue, Apr 13, 2010 at 17:12, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> > You don't have to do anything for that -- if a DB connection goes away,
> and
> > the transaction is not committed, it will be rolled back.
> >
> > But this is a terrible idea for medium to large-sized data, or
> long-running
> > tasks.
> >
> > I haven't looked at the patch, but I assume you would need to change how
> it
> > works with transactions to get this to work.
> >
> > -D
> >
> > On Tue, Apr 13, 2010 at 5:04 PM, Sandesh Devaraju <
> > sandesh.devaraju@gmail.com> wrote:
> >
> >> @Ashutosh: I am currently running task with speculative execution
> >> turned off, but was wondering if there is a way to avoid the
> >> performance penalty.
> >>
> >> @Dimitry: I would like to try out option 1 - any  pointers on how to
> >> infer this "killed" status in the UDF?
> >>
> >> On Tuesday, April 13, 2010, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> >> > Option 1: write everything in a given mapper in one big transaction,
> roll
> >> > back if killed (this is obviously a performance killer)
> >> >
> >> > Option 2: on spin-up, the task creates a temporary table by copying
> the
> >> > definition from the main table; the allFinished() method, or whatever
> we
> >> are
> >> > calling it now, moves data from the temp tables of successfull
> attempts
> >> into
> >> > the main table. Also not awesome.
> >> >
> >> > Option 3: Write to fs, bulk import into a database at the end of your
> >> job.
> >> > Safest, sanest, most parallelizable. See dependency tools like the
> >> recently
> >> > open-sourced Azkaban for making life easier in that regard.
> >> >
> >> > -Dmitriy
> >> >
> >> > On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
> >> > ashutosh.chauhan@gmail.com> wrote:
> >> >
> >> >> Sandesh,
> >> >>
> >> >> As a workaround you can set the property
> >> >> mapred.[map|reduce].max.attempts to 1, which I believe will turn off
> >> >> speculative execution. You can pass this as -D switch on pig command
> >> >> line or through mapred-site.xml . Proper way to do it will be the way
> >> >> you suggested (though that will be less performant as well as
>  complex
> >> >> to implement). You may also want to comment on that jira with your
> >> >> issue.
> >> >>
> >> >> Ashutosh
> >> >>
> >> >> On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
> >> >> <sa...@gmail.com> wrote:
> >> >> > Hi All,
> >> >> >
> >> >> > I am using PIG-1229 to write pig query output to a database.
> However,
> >> >> > I noticed that because of speculative execution, spurious records
> end
> >> >> > up being written.
> >> >> >
> >> >> > I was wondering if there is a way to infer if current reduce task
> is
> >> >> > running in a speculative slot that was cancelled (and hence a
> rollback
> >> >> > needs to be issued).
> >> >> >
> >> >> > Thanks in advance!
> >> >> >
> >> >> > - Sandesh
> >> >> >
> >> >>
> >> >
> >>
> >
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Ashutosh Chauhan <as...@gmail.com>.
Sandesh,

Which perf penalty you are trying to avoid? If you are writing same
record from four different reducers (which will happen with S.E.
turned on) you are only straining your DB.

Ashutosh
On Tue, Apr 13, 2010 at 17:12, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> You don't have to do anything for that -- if a DB connection goes away, and
> the transaction is not committed, it will be rolled back.
>
> But this is a terrible idea for medium to large-sized data, or long-running
> tasks.
>
> I haven't looked at the patch, but I assume you would need to change how it
> works with transactions to get this to work.
>
> -D
>
> On Tue, Apr 13, 2010 at 5:04 PM, Sandesh Devaraju <
> sandesh.devaraju@gmail.com> wrote:
>
>> @Ashutosh: I am currently running task with speculative execution
>> turned off, but was wondering if there is a way to avoid the
>> performance penalty.
>>
>> @Dimitry: I would like to try out option 1 - any  pointers on how to
>> infer this "killed" status in the UDF?
>>
>> On Tuesday, April 13, 2010, Dmitriy Ryaboy <dv...@gmail.com> wrote:
>> > Option 1: write everything in a given mapper in one big transaction, roll
>> > back if killed (this is obviously a performance killer)
>> >
>> > Option 2: on spin-up, the task creates a temporary table by copying the
>> > definition from the main table; the allFinished() method, or whatever we
>> are
>> > calling it now, moves data from the temp tables of successfull attempts
>> into
>> > the main table. Also not awesome.
>> >
>> > Option 3: Write to fs, bulk import into a database at the end of your
>> job.
>> > Safest, sanest, most parallelizable. See dependency tools like the
>> recently
>> > open-sourced Azkaban for making life easier in that regard.
>> >
>> > -Dmitriy
>> >
>> > On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
>> > ashutosh.chauhan@gmail.com> wrote:
>> >
>> >> Sandesh,
>> >>
>> >> As a workaround you can set the property
>> >> mapred.[map|reduce].max.attempts to 1, which I believe will turn off
>> >> speculative execution. You can pass this as -D switch on pig command
>> >> line or through mapred-site.xml . Proper way to do it will be the way
>> >> you suggested (though that will be less performant as well as  complex
>> >> to implement). You may also want to comment on that jira with your
>> >> issue.
>> >>
>> >> Ashutosh
>> >>
>> >> On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
>> >> <sa...@gmail.com> wrote:
>> >> > Hi All,
>> >> >
>> >> > I am using PIG-1229 to write pig query output to a database. However,
>> >> > I noticed that because of speculative execution, spurious records end
>> >> > up being written.
>> >> >
>> >> > I was wondering if there is a way to infer if current reduce task is
>> >> > running in a speculative slot that was cancelled (and hence a rollback
>> >> > needs to be issued).
>> >> >
>> >> > Thanks in advance!
>> >> >
>> >> > - Sandesh
>> >> >
>> >>
>> >
>>
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
You don't have to do anything for that -- if a DB connection goes away, and
the transaction is not committed, it will be rolled back.

But this is a terrible idea for medium to large-sized data, or long-running
tasks.

I haven't looked at the patch, but I assume you would need to change how it
works with transactions to get this to work.

-D

On Tue, Apr 13, 2010 at 5:04 PM, Sandesh Devaraju <
sandesh.devaraju@gmail.com> wrote:

> @Ashutosh: I am currently running task with speculative execution
> turned off, but was wondering if there is a way to avoid the
> performance penalty.
>
> @Dimitry: I would like to try out option 1 - any  pointers on how to
> infer this "killed" status in the UDF?
>
> On Tuesday, April 13, 2010, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> > Option 1: write everything in a given mapper in one big transaction, roll
> > back if killed (this is obviously a performance killer)
> >
> > Option 2: on spin-up, the task creates a temporary table by copying the
> > definition from the main table; the allFinished() method, or whatever we
> are
> > calling it now, moves data from the temp tables of successfull attempts
> into
> > the main table. Also not awesome.
> >
> > Option 3: Write to fs, bulk import into a database at the end of your
> job.
> > Safest, sanest, most parallelizable. See dependency tools like the
> recently
> > open-sourced Azkaban for making life easier in that regard.
> >
> > -Dmitriy
> >
> > On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
> > ashutosh.chauhan@gmail.com> wrote:
> >
> >> Sandesh,
> >>
> >> As a workaround you can set the property
> >> mapred.[map|reduce].max.attempts to 1, which I believe will turn off
> >> speculative execution. You can pass this as -D switch on pig command
> >> line or through mapred-site.xml . Proper way to do it will be the way
> >> you suggested (though that will be less performant as well as  complex
> >> to implement). You may also want to comment on that jira with your
> >> issue.
> >>
> >> Ashutosh
> >>
> >> On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
> >> <sa...@gmail.com> wrote:
> >> > Hi All,
> >> >
> >> > I am using PIG-1229 to write pig query output to a database. However,
> >> > I noticed that because of speculative execution, spurious records end
> >> > up being written.
> >> >
> >> > I was wondering if there is a way to infer if current reduce task is
> >> > running in a speculative slot that was cancelled (and hence a rollback
> >> > needs to be issued).
> >> >
> >> > Thanks in advance!
> >> >
> >> > - Sandesh
> >> >
> >>
> >
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Sandesh Devaraju <sa...@gmail.com>.
@Ashutosh: I am currently running task with speculative execution
turned off, but was wondering if there is a way to avoid the
performance penalty.

@Dimitry: I would like to try out option 1 - any  pointers on how to
infer this "killed" status in the UDF?

On Tuesday, April 13, 2010, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> Option 1: write everything in a given mapper in one big transaction, roll
> back if killed (this is obviously a performance killer)
>
> Option 2: on spin-up, the task creates a temporary table by copying the
> definition from the main table; the allFinished() method, or whatever we are
> calling it now, moves data from the temp tables of successfull attempts into
> the main table. Also not awesome.
>
> Option 3: Write to fs, bulk import into a database at the end of your job.
> Safest, sanest, most parallelizable. See dependency tools like the recently
> open-sourced Azkaban for making life easier in that regard.
>
> -Dmitriy
>
> On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
> ashutosh.chauhan@gmail.com> wrote:
>
>> Sandesh,
>>
>> As a workaround you can set the property
>> mapred.[map|reduce].max.attempts to 1, which I believe will turn off
>> speculative execution. You can pass this as -D switch on pig command
>> line or through mapred-site.xml . Proper way to do it will be the way
>> you suggested (though that will be less performant as well as  complex
>> to implement). You may also want to comment on that jira with your
>> issue.
>>
>> Ashutosh
>>
>> On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
>> <sa...@gmail.com> wrote:
>> > Hi All,
>> >
>> > I am using PIG-1229 to write pig query output to a database. However,
>> > I noticed that because of speculative execution, spurious records end
>> > up being written.
>> >
>> > I was wondering if there is a way to infer if current reduce task is
>> > running in a speculative slot that was cancelled (and hence a rollback
>> > needs to be issued).
>> >
>> > Thanks in advance!
>> >
>> > - Sandesh
>> >
>>
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Option 1: write everything in a given mapper in one big transaction, roll
back if killed (this is obviously a performance killer)

Option 2: on spin-up, the task creates a temporary table by copying the
definition from the main table; the allFinished() method, or whatever we are
calling it now, moves data from the temp tables of successfull attempts into
the main table. Also not awesome.

Option 3: Write to fs, bulk import into a database at the end of your job.
Safest, sanest, most parallelizable. See dependency tools like the recently
open-sourced Azkaban for making life easier in that regard.

-Dmitriy

On Tue, Apr 13, 2010 at 4:35 PM, Ashutosh Chauhan <
ashutosh.chauhan@gmail.com> wrote:

> Sandesh,
>
> As a workaround you can set the property
> mapred.[map|reduce].max.attempts to 1, which I believe will turn off
> speculative execution. You can pass this as -D switch on pig command
> line or through mapred-site.xml . Proper way to do it will be the way
> you suggested (though that will be less performant as well as  complex
> to implement). You may also want to comment on that jira with your
> issue.
>
> Ashutosh
>
> On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
> <sa...@gmail.com> wrote:
> > Hi All,
> >
> > I am using PIG-1229 to write pig query output to a database. However,
> > I noticed that because of speculative execution, spurious records end
> > up being written.
> >
> > I was wondering if there is a way to infer if current reduce task is
> > running in a speculative slot that was cancelled (and hence a rollback
> > needs to be issued).
> >
> > Thanks in advance!
> >
> > - Sandesh
> >
>

Re: Possible check for speculative execution cancellation in finish() of storage UDF

Posted by Ashutosh Chauhan <as...@gmail.com>.
Sandesh,

As a workaround you can set the property
mapred.[map|reduce].max.attempts to 1, which I believe will turn off
speculative execution. You can pass this as -D switch on pig command
line or through mapred-site.xml . Proper way to do it will be the way
you suggested (though that will be less performant as well as  complex
to implement). You may also want to comment on that jira with your
issue.

Ashutosh

On Tue, Apr 13, 2010 at 16:16, Sandesh Devaraju
<sa...@gmail.com> wrote:
> Hi All,
>
> I am using PIG-1229 to write pig query output to a database. However,
> I noticed that because of speculative execution, spurious records end
> up being written.
>
> I was wondering if there is a way to infer if current reduce task is
> running in a speculative slot that was cancelled (and hence a rollback
> needs to be issued).
>
> Thanks in advance!
>
> - Sandesh
>