You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Koert Kuipers <ko...@tresata.com> on 2012/10/12 16:30:19 UTC

concurrency

We have a dataset that is heavily partitioned, like this
/data
  partition1/
    _SUCESS
    part-00000
    part-00001
    ...
  partition1/
    _SUCCESS
    part-00000
    part-00001
    ....
  ...

We have loaders that use map-red jobs to add new partitions to this data
set at a regular interval (so they write to new sub-directories).

We also have map-red queries that read from the entire dataset (/data/*).
My worry here is concurrency. It will happen that a query job runs
while a loader
job is adding a new partition at the same time. Is there a risk that the query
could read incomplete or corrupt files? Is there a way to use the _SUCESS
files to prevent this from happening?
Thanks for your time!
Best,
Koert

Re: concurrency

Posted by Koert Kuipers <ko...@tresata.com>.
Hey Harsh & Joep,

My main worry was actually the simpler situation in which only new subdirs
are created by loaders. If we for a second focus on this "append-only"
situation, which i admit is only a subset of all cases, even then it is not
entirely clear to me how to go about this. Right now i pass into my query
job (the one that reads) a path like /data/*, which means i do not check if
every subdir has a _SUCCESS flag. If i wanted to add this check then i
would have to do that check myself for every subdir (with another glob
expression i suppose), and then construct a set of paths that go into the
job instead of my simple glob? That set could become very large (thousands
of paths). Is this what you are suggesting Harsh?

Now Joep rightly points out that reality is never that simple. We also have
jobs running like the ones that he described, which compact data in place,
and other fun stuff like that... For us a typical situations are
compactions, but we also do roll-ups (where we replace many subdirs for
hourly updates with a single subdir for say a month). Right now my approach
was to do this stuff once a week in a "reserved" windows in which no
queries are running, but I would love to hear about suggestions for this as
well.

Best (and Groeten Joep!)
Koert


On Fri, Oct 12, 2012 at 12:17 PM, Harsh J <ha...@cloudera.com> wrote:

> Joep,
>
> You're right - I missed in my quick scan that he was actually
> replacing those files there. Sorry for the confusion Koert!
>
> On Fri, Oct 12, 2012 at 9:37 PM, J. Rottinghuis <jr...@gmail.com>
> wrote:
> > Hi Harsh, Moge Koert,
> >
> > If Koerts problem is similar to what I have been thinking about where we
> > want to consolidate and re-compress older datasets, then the _SUCCESS
> does
> > not really help. _SUCCESS helps to tell if a new dataset is completely
> > written.
> > However, what is needed here is to replace an existing dataset.
> >
> > Naive approach:
> > The new set can be generated in parallel. Old directory moved out of the
> > way (rm and therefore moved to Trash) and then he new directory renamed
> > into place.
> > I think the problem Koert is describing is how to not mess up map-reduce
> > jobs that have already started and may have read some, but not all of the
> > files in the directory. If you're lucky, then you'll try to read a file
> > that is no longer there, but if you're unlucky then you read a new file
> > with the same name and you will never know that you have inconsistent
> > results.
> >
> > Trying to be clever approach:
> > Every query puts a "lock" file with the job-id in the directory they
> read.
> > Only when there are no locks, replace the data-set as describe in the
> naive
> > approach. This will reduce the odds for problems, but is rife with
> > race-conditions. Also, if the data is read-heavy, you may never get to
> > replace the directory. Now you need a write lock to prevent new reads
> from
> > starting.
> >
> > Would hardlinks solve this problem?
> > Simply create a set of (temporary) hardlinks to the files in the
> directory
> > you want to read? Then if the old set is moved out of the way, the
> > hardlinks should still point to them. The reading job reads from the
> > hardlinks and cleans them up when done. If the hardlinks are placed in a
> > directory with the reading job-id then garbage collection should be
> > possible for crashed jobs if normal cleanup fails.
> >
> > Groetjes,
> >
> > Joep
> >
> > On Fri, Oct 12, 2012 at 8:35 AM, Harsh J <ha...@cloudera.com> wrote:
> >
> >> Hey Koert,
> >>
> >> Yes the _SUCCESS (Created on successful commit-end of a job) file
> >> existence may be checked before firing the new job with the chosen
> >> input directory. This is consistent with what Oozie does as well.
> >>
> >> Since the listing of files happens post-submit() call, doing this will
> >> "just work" :)
> >>
> >> On Fri, Oct 12, 2012 at 8:00 PM, Koert Kuipers <ko...@tresata.com>
> wrote:
> >> > We have a dataset that is heavily partitioned, like this
> >> > /data
> >> >   partition1/
> >> >     _SUCESS
> >> >     part-00000
> >> >     part-00001
> >> >     ...
> >> >   partition1/
> >> >     _SUCCESS
> >> >     part-00000
> >> >     part-00001
> >> >     ....
> >> >   ...
> >> >
> >> > We have loaders that use map-red jobs to add new partitions to this
> data
> >> > set at a regular interval (so they write to new sub-directories).
> >> >
> >> > We also have map-red queries that read from the entire dataset
> (/data/*).
> >> > My worry here is concurrency. It will happen that a query job runs
> >> > while a loader
> >> > job is adding a new partition at the same time. Is there a risk that
> the
> >> query
> >> > could read incomplete or corrupt files? Is there a way to use the
> _SUCESS
> >> > files to prevent this from happening?
> >> > Thanks for your time!
> >> > Best,
> >> > Koert
> >>
> >>
> >>
> >> --
> >> Harsh J
> >>
>
>
>
> --
> Harsh J
>

Re: concurrency

Posted by Harsh J <ha...@cloudera.com>.
Joep,

You're right - I missed in my quick scan that he was actually
replacing those files there. Sorry for the confusion Koert!

On Fri, Oct 12, 2012 at 9:37 PM, J. Rottinghuis <jr...@gmail.com> wrote:
> Hi Harsh, Moge Koert,
>
> If Koerts problem is similar to what I have been thinking about where we
> want to consolidate and re-compress older datasets, then the _SUCCESS does
> not really help. _SUCCESS helps to tell if a new dataset is completely
> written.
> However, what is needed here is to replace an existing dataset.
>
> Naive approach:
> The new set can be generated in parallel. Old directory moved out of the
> way (rm and therefore moved to Trash) and then he new directory renamed
> into place.
> I think the problem Koert is describing is how to not mess up map-reduce
> jobs that have already started and may have read some, but not all of the
> files in the directory. If you're lucky, then you'll try to read a file
> that is no longer there, but if you're unlucky then you read a new file
> with the same name and you will never know that you have inconsistent
> results.
>
> Trying to be clever approach:
> Every query puts a "lock" file with the job-id in the directory they read.
> Only when there are no locks, replace the data-set as describe in the naive
> approach. This will reduce the odds for problems, but is rife with
> race-conditions. Also, if the data is read-heavy, you may never get to
> replace the directory. Now you need a write lock to prevent new reads from
> starting.
>
> Would hardlinks solve this problem?
> Simply create a set of (temporary) hardlinks to the files in the directory
> you want to read? Then if the old set is moved out of the way, the
> hardlinks should still point to them. The reading job reads from the
> hardlinks and cleans them up when done. If the hardlinks are placed in a
> directory with the reading job-id then garbage collection should be
> possible for crashed jobs if normal cleanup fails.
>
> Groetjes,
>
> Joep
>
> On Fri, Oct 12, 2012 at 8:35 AM, Harsh J <ha...@cloudera.com> wrote:
>
>> Hey Koert,
>>
>> Yes the _SUCCESS (Created on successful commit-end of a job) file
>> existence may be checked before firing the new job with the chosen
>> input directory. This is consistent with what Oozie does as well.
>>
>> Since the listing of files happens post-submit() call, doing this will
>> "just work" :)
>>
>> On Fri, Oct 12, 2012 at 8:00 PM, Koert Kuipers <ko...@tresata.com> wrote:
>> > We have a dataset that is heavily partitioned, like this
>> > /data
>> >   partition1/
>> >     _SUCESS
>> >     part-00000
>> >     part-00001
>> >     ...
>> >   partition1/
>> >     _SUCCESS
>> >     part-00000
>> >     part-00001
>> >     ....
>> >   ...
>> >
>> > We have loaders that use map-red jobs to add new partitions to this data
>> > set at a regular interval (so they write to new sub-directories).
>> >
>> > We also have map-red queries that read from the entire dataset (/data/*).
>> > My worry here is concurrency. It will happen that a query job runs
>> > while a loader
>> > job is adding a new partition at the same time. Is there a risk that the
>> query
>> > could read incomplete or corrupt files? Is there a way to use the _SUCESS
>> > files to prevent this from happening?
>> > Thanks for your time!
>> > Best,
>> > Koert
>>
>>
>>
>> --
>> Harsh J
>>



-- 
Harsh J

Re: concurrency

Posted by "J. Rottinghuis" <jr...@gmail.com>.
Hi Harsh, Moge Koert,

If Koerts problem is similar to what I have been thinking about where we
want to consolidate and re-compress older datasets, then the _SUCCESS does
not really help. _SUCCESS helps to tell if a new dataset is completely
written.
However, what is needed here is to replace an existing dataset.

Naive approach:
The new set can be generated in parallel. Old directory moved out of the
way (rm and therefore moved to Trash) and then he new directory renamed
into place.
I think the problem Koert is describing is how to not mess up map-reduce
jobs that have already started and may have read some, but not all of the
files in the directory. If you're lucky, then you'll try to read a file
that is no longer there, but if you're unlucky then you read a new file
with the same name and you will never know that you have inconsistent
results.

Trying to be clever approach:
Every query puts a "lock" file with the job-id in the directory they read.
Only when there are no locks, replace the data-set as describe in the naive
approach. This will reduce the odds for problems, but is rife with
race-conditions. Also, if the data is read-heavy, you may never get to
replace the directory. Now you need a write lock to prevent new reads from
starting.

Would hardlinks solve this problem?
Simply create a set of (temporary) hardlinks to the files in the directory
you want to read? Then if the old set is moved out of the way, the
hardlinks should still point to them. The reading job reads from the
hardlinks and cleans them up when done. If the hardlinks are placed in a
directory with the reading job-id then garbage collection should be
possible for crashed jobs if normal cleanup fails.

Groetjes,

Joep

On Fri, Oct 12, 2012 at 8:35 AM, Harsh J <ha...@cloudera.com> wrote:

> Hey Koert,
>
> Yes the _SUCCESS (Created on successful commit-end of a job) file
> existence may be checked before firing the new job with the chosen
> input directory. This is consistent with what Oozie does as well.
>
> Since the listing of files happens post-submit() call, doing this will
> "just work" :)
>
> On Fri, Oct 12, 2012 at 8:00 PM, Koert Kuipers <ko...@tresata.com> wrote:
> > We have a dataset that is heavily partitioned, like this
> > /data
> >   partition1/
> >     _SUCESS
> >     part-00000
> >     part-00001
> >     ...
> >   partition1/
> >     _SUCCESS
> >     part-00000
> >     part-00001
> >     ....
> >   ...
> >
> > We have loaders that use map-red jobs to add new partitions to this data
> > set at a regular interval (so they write to new sub-directories).
> >
> > We also have map-red queries that read from the entire dataset (/data/*).
> > My worry here is concurrency. It will happen that a query job runs
> > while a loader
> > job is adding a new partition at the same time. Is there a risk that the
> query
> > could read incomplete or corrupt files? Is there a way to use the _SUCESS
> > files to prevent this from happening?
> > Thanks for your time!
> > Best,
> > Koert
>
>
>
> --
> Harsh J
>

Re: concurrency

Posted by Harsh J <ha...@cloudera.com>.
Hey Koert,

Yes the _SUCCESS (Created on successful commit-end of a job) file
existence may be checked before firing the new job with the chosen
input directory. This is consistent with what Oozie does as well.

Since the listing of files happens post-submit() call, doing this will
"just work" :)

On Fri, Oct 12, 2012 at 8:00 PM, Koert Kuipers <ko...@tresata.com> wrote:
> We have a dataset that is heavily partitioned, like this
> /data
>   partition1/
>     _SUCESS
>     part-00000
>     part-00001
>     ...
>   partition1/
>     _SUCCESS
>     part-00000
>     part-00001
>     ....
>   ...
>
> We have loaders that use map-red jobs to add new partitions to this data
> set at a regular interval (so they write to new sub-directories).
>
> We also have map-red queries that read from the entire dataset (/data/*).
> My worry here is concurrency. It will happen that a query job runs
> while a loader
> job is adding a new partition at the same time. Is there a risk that the query
> could read incomplete or corrupt files? Is there a way to use the _SUCESS
> files to prevent this from happening?
> Thanks for your time!
> Best,
> Koert



-- 
Harsh J