You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Yuri Pradkin <yu...@ISI.EDU> on 2008/01/14 21:33:13 UTC

writing output files in hadoop streaming

Hi,

We've been using Hadoop streaming for the last 3-4 months and
it all worked out fine except for one little problem:

in some situations a hadoop reduce job gets multiple key groups
and is desired to write out a separate binary output file for
each group.  However, when a reduce task takes too long and
there is spare capacity, the task may be replicated on another
node and these two are basically racing each other.  One finishes
cleanly and the other is terminated.  Hadoop takes care to remove
ther terminated job's output from HDFS, but since we're writing
files from scripts, it's up to us to separate the output of cleanly
finished tasks from the output of tasks that are terminated
prematurely.

Does somebody have answers to the following questions:
1. Is there an easy way to tell in a script launched by the Hadoop
   streaming, if the script was terminated before it received complete
   input?
   As far as I was able to ascertain, no signals are being sent to those
   unix-jobs.  They just stop receiving data from STDIN.  The only way
   that seems to work for me was to process all input and then write
   something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
   this is ugly, I hope there is a better solution.

2. Is there any good way to write multiple HDFS files from a streaming script
   *and have Hadoop cleanup those files* when it decides to destroy the
   task?  If there was just one file, I could simply use STDOUT, but dumping
   multiple binary files to STDOUT is not pretty.

We are writing output files to an NFS partition shared among all reducers, which
makes it all slightly more complicated because of possible file overwrites.

Our current solution, which is not pretty but avoids directly addressing this
problem is to write out files with random names (created with mktemp) and write 
to STDOUT the renaming command for this file to it's desired name.  Then as a 
post-processing stage, I execute all those commands and delete the remaining
temporary files as duplicates/incompletes.

Thanks,

  -Yuri

Re: writing output files in hadoop streaming

Posted by Yuri Pradkin <yu...@ISI.EDU>.
On Tue, Jan 15, 2008 at 09:09:07AM -0800, Ted Dunning wrote:
> 
> Regarding the race condition, hadoop builds task specific temporary
> directories in the output directory, one per reduce task, that hold these
> output files (as long as you don't use absolute path names).  When the
> process completes successfully, the output files from that temporary
> directory are moved to the correct place and the temporary task-specific
> directory is deleted.  If the reduce task dies or is superceded by another
> task, then the directory is simply deleted.  The file is not kept in memory
> pending write.

That sounds like a "generic" record writer I sketched might work.  I'd love
to hear you comment on it.

Are there any e.g. perl bindings to make our scripts write to files in those
temp directories (on HDFS)?  That could be another solution to our problem.

> I am curious about how to demarcate the image boundaries in your current
> output.  Hadoop streaming makes the strong presumption of line orientation.
> If that isn't valid for your output, then you may have a program that is
> only accidentally working by finding line boundaries in binary data.  In
> particular, you may someday have a situation where some of the data has one
> kind of line boundary that is recognized, but on output the corresponding
> boundary is generated in a different form.  For instance, if your program
> sees CR-LF, it might take the pair as a line boundary and emit just LF.
> Even if this is not happening now, you may be in for some trouble later.

Currently we are not using any image boundaries.  Our current reducer bunches
up all records with the same key and feeds it to an image generation program
that writes to a unique file name on NFS  (I described it in the previous post).
Each image is in a separate file.  Image boundary is the key boundary.

The "generic" record writer for multiple files would take a file size as an
argument, so it can know how long the rawbytes field is.

Thanks!

  -Yuri

> 
> 
> On 1/15/08 8:57 AM, "Yuri Pradkin" <yu...@ISI.EDU> wrote:
> 
> > Well, in our case the reducer munches key-value pairs to
> > generate images; it's conceivable that we'll have other
> > reducers in the future to do other interesting things.
> > So, it would be impractical to move all that code into
> > a RecordWriter.  We don't want to have a new  RecordWriter
> > for each new job, and we'd like to keep our processing
> > code in languages other than Java, which is the only reason
> > to use streaming, right?
> > 
> > 
> > Do you think it would be a good solution to come up with a
> > "generic" version of a record writer that would take as
> > input, say: 
> > <filename, filesize, rawbytes[filesize]>
> > and do the actual writing?
> > 
> > Will the Hadoop guarantee that only one "filename" will be
> > created/written to even if there are racing tasks and the
> > file will not be corrupted?
> > 
> > And what about memory requirements? -- When filesize is large,
> > would it have to be all stored in memory before it's written,
> > or Hadoop will cache it in a temp file?
> > 
> > Thanks much for your input.
> > 
> >   -Yuri 
> > 
> > On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping Qi wrote:
> >> 
> >> One way to achieve your goal is to implement your own
> >> OutputFormat/RecordWriter classes.
> >> Your reducer will emit all the key/value pairs as in the normal case.
> >> In your record writer class can open multiple output files and dispatch
> >> the key/value to appropriate files based on the actual values.
> >> This way, the Hadoop framework takes care of all the issues related the
> >> namespace and the necessary cleanup of the output files.
> >> 
> >> 
> >> Runping
> >>  
> >> 
> >>> -----Original Message-----
> >>> From: Yuri Pradkin [mailto:yuri@ISI.EDU]
> >>> Sent: Monday, January 14, 2008 12:33 PM
> >>> To: hadoop-user@lucene.apache.org
> >>> Subject: writing output files in hadoop streaming
> >>> 
> >>> Hi,
> >>> 
> >>> We've been using Hadoop streaming for the last 3-4 months and
> >>> it all worked out fine except for one little problem:
> >>> 
> >>> in some situations a hadoop reduce job gets multiple key groups
> >>> and is desired to write out a separate binary output file for
> >>> each group.  However, when a reduce task takes too long and
> >>> there is spare capacity, the task may be replicated on another
> >>> node and these two are basically racing each other.  One finishes
> >>> cleanly and the other is terminated.  Hadoop takes care to remove
> >>> ther terminated job's output from HDFS, but since we're writing
> >>> files from scripts, it's up to us to separate the output of cleanly
> >>> finished tasks from the output of tasks that are terminated
> >>> prematurely.
> >>> 
> >>> Does somebody have answers to the following questions:
> >>> 1. Is there an easy way to tell in a script launched by the Hadoop
> >>>    streaming, if the script was terminated before it received complete
> >>>    input?
> >>>    As far as I was able to ascertain, no signals are being sent to
> >> those
> >>>    unix-jobs.  They just stop receiving data from STDIN.  The only way
> >>>    that seems to work for me was to process all input and then write
> >>>    something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
> >>>    this is ugly, I hope there is a better solution.
> >>> 
> >>> 2. Is there any good way to write multiple HDFS files from a streaming
> >>> script
> >>>    *and have Hadoop cleanup those files* when it decides to destroy
> >> the
> >>>    task?  If there was just one file, I could simply use STDOUT, but
> >>> dumping
> >>>    multiple binary files to STDOUT is not pretty.
> >>> 
> >>> We are writing output files to an NFS partition shared among all
> >> reducers,
> >>> which
> >>> makes it all slightly more complicated because of possible file
> >>> overwrites.
> >>> 
> >>> Our current solution, which is not pretty but avoids directly
> >> addressing
> >>> this
> >>> problem is to write out files with random names (created with mktemp)
> >> and
> >>> write
> >>> to STDOUT the renaming command for this file to it's desired name.
> >> Then
> >>> as a
> >>> post-processing stage, I execute all those commands and delete the
> >>> remaining
> >>> temporary files as duplicates/incompletes.
> >>> 
> >>> Thanks,
> >>> 
> >>>   -Yuri

Re: writing output files in hadoop streaming

Posted by John Heidemann <jo...@isi.edu>.
>On 1/15/08 12:54 PM, "Miles Osborne" <mi...@inf.ed.ac.uk> wrote:
>
>> surely the clean way (in a streaming environment) would be to define a
>> representation of some kind which serialises the output.
>> 
>> http://en.wikipedia.org/wiki/Serialization
>> 
>> after your mappers and reducers have completed, you would then have some
>> code which deserialise (unpacked) the output as desired.   this would easily
>> allow you to reconstruct the  two files from a single (set) of file
>> fragments.


On Tue, 15 Jan 2008 12:56:12 PST, Ted Dunning wrote: 
>Also, this gives you a solution to your race condition (by using hadoop's
>mechanisms) and it also gives you much higher
>throughput/reliability/scalability than writing to NFS can possibly give
>you.
>

I agree that serializing and using the standard Hadoop output stream
best leverages the Hadoop mechanisms.  I even labeled it the "proper"
way, and talked about serialization (without using that word):

>> On 15/01/2008, John Heidemann <jo...@isi.edu> wrote:
>>...
>>> There's a second way, which is where most of the discussion has gone,
>>> call it the "proper" way:
>>> 
>>> Rather than writing files as side-effects, the argument is to just
>>> output the data with the standard hadoop mechanism.  In streaming, this
>>> means through stdout.
>>>...
>>> But I actually think this is not viable for us,
>>> because we're writing images which are binary.
>>>...
>>> If we go that way, then we're basically packing many files into one.
>>> To me it seems to me cleanest, if one wants to do that, to use some
>>> existing format, like tar or zip or cpio, or maybe the hadoop multi-file
>>> format.  But this way seems fraught with peril, since we have to fight
>>> streaming and custom record output, and then still extract the files
>>> after output completes anyway.  Lots and lots of work---it feels like
>>> this can't be right.
>>> 
>>> (Another one hacky way to make this work in streaming is to convert binary
>>> to
>>> ascii, like base-64-ize the files.  Been there in SQL.  Done that.
>>> Don't want to do it again.  It still has all the encoding and
>>> post-processing junk. :-)

BUT...I'm suggesting that should not be the ONLY viable way.

Two reasons:  first, yes, serialization can work.  But you've put a lot
of layers of junk in the way, all of which has to be done and undone.
This can easily become a lot of code, and it can easily eat into any
performance advantage.

On the other hand, if Hadoop would just send a signal to the aborted
terminated reducer, rather than just closing stdin/stdout, then a few
lines of signal capture code and a few more to unlink the temp file does
everything, and a few lines of signal sending code in Hadoop streaming.
Plus a few on the commit side, and you end up with about 50 lines of
code.  As opposed to serialization, which is hundreds of lines of stubs,
or large libraries to handle something like tar or zip, plus potentially
storage overhead (if you convert to base-64 or something), and storage
overhead because you have to store (at least temporarily) both
serialized and unserialized versions.

Second, the Google folks found side-effects useful enough that they
support them, documented them in Dean and Ghemawat, and seem to use them
internally.  Perhaps Hadoop should consider the costs of supporting
side-effects before discarding them?


Going back to part of Ted's comment and his performance objection:

On Tue, 15 Jan 2008 12:56:12 PST, Ted Dunning wrote: 
>Also, this gives you a solution to your race condition (by using hadoop's
>mechanisms) and it also gives you much higher
>throughput/reliability/scalability than writing to NFS can possibly give
>you.

About the throughput issues, if you don't want to write to NFS (we can
at our current cluster size, but I know others are lucker than us :-).
If you want, just write side effect files into HDFS to get all the
throughput/reliability/scalability you would get with Hadoop's standard
mechanisms.  


To try and clarify what I'm hearing, though, I think the answer I'm
hearing to my question:

>So what do the Hadoop architects think about side-effects and recovering
>from half-run jobs?  Does hadoop intend to support side-effects (for
>interested users, obviously not as standard practice)?  If we were in
>Java would we get a signal we could use to do cleanup?

Is that Hadoop does NOT current support side-effects, because people
didn't really consider it.

And there's some push-back against side-effects as being not very
clean.  (Which I agree with to a first order, but not strongly enough
that I think it should be forbidden.)

Are folks anti-side-effect so much that if we submit the 10-line signal
sending patch to streaming it will be given a -1?  (Footnote: it's a
10-line C patch, I have to confirm what it looks like in Java.)

   -John Heidemann


Re: writing output files in hadoop streaming

Posted by Ted Dunning <td...@veoh.com>.

Also, this gives you a solution to your race condition (by using hadoop's
mechanisms) and it also gives you much higher
throughput/reliability/scalability than writing to NFS can possibly give
you.


On 1/15/08 12:54 PM, "Miles Osborne" <mi...@inf.ed.ac.uk> wrote:

> surely the clean way (in a streaming environment) would be to define a
> representation of some kind which serialises the output.
> 
> http://en.wikipedia.org/wiki/Serialization
> 
> after your mappers and reducers have completed, you would then have some
> code which deserialise (unpacked) the output as desired.   this would easily
> allow you to reconstruct the  two files from a single (set) of file
> fragments.
> 
> this approach would entail defining the serialisation / deserialisation
> process in a way which was distinct from the actual mappers / reducers and
> then having a little compilation process take that  definition and both
> create the necessary serialisers / deserialisers and also serve as
> documentation.
> 
> it does have extra overhead, but in the long run it is worth it, since the
> interfaces are actually documented.
> 
> Miles
> 
> On 15/01/2008, John Heidemann <jo...@isi.edu> wrote:
>> 
>> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote:
>>> 
>>> Regarding the race condition, hadoop builds task specific temporary
>>> directories in the output directory, one per reduce task, that hold these
>>> output files (as long as you don't use absolute path names).  When the
>>> process completes successfully, the output files from that temporary
>>> directory are moved to the correct place and the temporary task-specific
>>> directory is deleted.  If the reduce task dies or is superceded by
>> another
>>> task, then the directory is simply deleted.  The file is not kept in
>> memory
>>> pending write.
>>> 
>>> I am curious about how to demarcate the image boundaries in your current
>>> output.  Hadoop streaming makes the strong presumption of line
>> orientation.
>>> If that isn't valid for your output, then you may have a program that is
>>> only accidentally working by finding line boundaries in binary data.  In
>>> particular, you may someday have a situation where some of the data has
>> one
>>> kind of line boundary that is recognized, but on output the corresponding
>>> boundary is generated in a different form.  For instance, if your program
>>> sees CR-LF, it might take the pair as a line boundary and emit just LF.
>>> Even if this is not happening now, you may be in for some trouble
>>> later.
>> 
>> I think Yuri left out a bit about what we're doing.
>> He wasn't clear about what files we're talking about writing.
>> Let me try to clarify.
>> 
>> As context, all this is in Hadoop streaming.
>> 
>> Here's one way, the "side-effect way" (this is what we're doing now):
>> 
>> In principle, we'd like to not ouptut ANYTHING to stdout from streaming.
>> Instead, we create new files somewhere in the shared Unix filespace.
>> Basically, these files are side-effects of the map/reduce computation.
>> 
>> This approach is described in Dean & Ghemawat section 4.5
>> (Side-effects), with the caveat that the user must be responsible for
>> making any side-effect atomic.
>> 
>> Our problem is, I think, that duplicated reducers scheduled for
>> straggler elimination can result in extra, partial side-effect files.
>> We're trying to figure out how to clean them up properly.
>> 
>> Currently it seems that prematurely terminated reducers (due to cancled
>> straggler elimination jobs) are not told they are terminated.  They just
>> get a SIGPIPE because their write destination goes away.
>> 
>> This prompted Yuri's first question:
>> 
>>>>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>>>>    streaming, if the script was terminated before it received
>> complete
>>>>>>    input?
>> 
>> To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so
>> they can catch and cleanup properly.  Otherwise there seems to be no
>> clean way to distinguish a half-run job from a fully run job that
>> happens to have less input.  (I.e., no way for our reducer to do a
>> commit or abort properly.)
>> 
>> (It would be nicer to send an in-band termination signal down stdin, but
>> I don't think a streaming reducer can do that.)
>> 
>> So what do the Hadoop architects think about side-effects and recovering
>> from half-run jobs?  Does hadoop intend to support side-effects (for
>> interested users, obviously not as standard practice)?  If we were in
>> Java would we get a signal we could use to do cleanup?
>> 
>> What do that Hadoop streaming people think?  Is this just a bug that
>> streaming is not propagating a signal that appears in Javaland?
>> 
>> 
>> 
>> There's a second way, which is where most of the discussion has gone,
>> call it the "proper" way:
>> 
>> Rather than writing files as side-effects, the argument is to just
>> output the data with the standard hadoop mechanism.  In streaming, this
>> means through stdout.
>> 
>> Which prompted Yuri's second question:
>>>>>> 2. Is there any good way to write multiple HDFS files from a
>> streaming
>>>>>> script
>>>>>>    *and have Hadoop cleanup those files* when it decides to destroy
>>>>> the
>>>>>>    task?  If there was just one file, I could simply use STDOUT, but
>>>>>> dumping
>>>>>>    multiple binary files to STDOUT is not pretty.
>> 
>> But I actually think this is not viable for us,
>> because we're writing images which are binary.
>> As per Doug's comment:
>> 
>>> If that isn't valid for your output, then you may have a program that is
>>> only accidentally working by finding line boundaries in binary data.
>> 
>> (Doug, we're not doing it this way right now.)
>> 
>> That said, if it worked, this way is clearly a lot cleaner, since Hadoop
>> already handles commit/abort for half-run jobs.  Basically all of our
>> half-run problems go away.  But they're replaced with File Formatt
>> Problems.
>> 
>> If we were in Java, we could write our own OutputRecord class.  This is
>> what
>> Runping suggested and Yuri was discussing.  I don't think that works for
>> us (because we're not in Java, although I suppose it might be made to
>> work).
>> 
>> If we go that way, then we're basically packing many files into one.
>> To me it seems to me cleanest, if one wants to do that, to use some
>> existing format, like tar or zip or cpio, or maybe the hadoop multi-file
>> format.  But this way seems fraught with peril, since we have to fight
>> streaming and custom record output, and then still extract the files
>> after output completes anyway.  Lots and lots of work---it feels like
>> this can't be right.
>> 
>> (Another one hacky way to make this work in streaming is to convert binary
>> to
>> ascii, like base-64-ize the files.  Been there in SQL.  Done that.
>> Don't want to do it again.  It still has all the encoding and
>> post-processing junk. :-)
>> 
>> 
>> 
>> Yuri had a very clever hack that merges the two schemes.  He writes to
>> random filenames as side-effects, but then writes the side-effect
>> filenames as hadoop output.  Therefore Hadoop handles commit/abort, and
>> post run he just collects the files that appear in Hadoop's part-*
>> output and discards the others.
>> 
>> This hack works, but IMHO the reducer should do the commit/abort of
>> side-effects, not some post-processing job.
>> 
>> 
>> So any thoughts about supporting side-effects?
>> 
>> 
>>    -John
>> 


Re: writing output files in hadoop streaming

Posted by Miles Osborne <mi...@inf.ed.ac.uk>.
surely the clean way (in a streaming environment) would be to define a
representation of some kind which serialises the output.

http://en.wikipedia.org/wiki/Serialization

after your mappers and reducers have completed, you would then have some
code which deserialise (unpacked) the output as desired.   this would easily
allow you to reconstruct the  two files from a single (set) of file
fragments.

this approach would entail defining the serialisation / deserialisation
process in a way which was distinct from the actual mappers / reducers and
then having a little compilation process take that  definition and both
create the necessary serialisers / deserialisers and also serve as
documentation.

it does have extra overhead, but in the long run it is worth it, since the
interfaces are actually documented.

Miles

On 15/01/2008, John Heidemann <jo...@isi.edu> wrote:
>
> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote:
> >
> >Regarding the race condition, hadoop builds task specific temporary
> >directories in the output directory, one per reduce task, that hold these
> >output files (as long as you don't use absolute path names).  When the
> >process completes successfully, the output files from that temporary
> >directory are moved to the correct place and the temporary task-specific
> >directory is deleted.  If the reduce task dies or is superceded by
> another
> >task, then the directory is simply deleted.  The file is not kept in
> memory
> >pending write.
> >
> >I am curious about how to demarcate the image boundaries in your current
> >output.  Hadoop streaming makes the strong presumption of line
> orientation.
> >If that isn't valid for your output, then you may have a program that is
> >only accidentally working by finding line boundaries in binary data.  In
> >particular, you may someday have a situation where some of the data has
> one
> >kind of line boundary that is recognized, but on output the corresponding
> >boundary is generated in a different form.  For instance, if your program
> >sees CR-LF, it might take the pair as a line boundary and emit just LF.
> >Even if this is not happening now, you may be in for some trouble
> >later.
>
> I think Yuri left out a bit about what we're doing.
> He wasn't clear about what files we're talking about writing.
> Let me try to clarify.
>
> As context, all this is in Hadoop streaming.
>
> Here's one way, the "side-effect way" (this is what we're doing now):
>
> In principle, we'd like to not ouptut ANYTHING to stdout from streaming.
> Instead, we create new files somewhere in the shared Unix filespace.
> Basically, these files are side-effects of the map/reduce computation.
>
> This approach is described in Dean & Ghemawat section 4.5
> (Side-effects), with the caveat that the user must be responsible for
> making any side-effect atomic.
>
> Our problem is, I think, that duplicated reducers scheduled for
> straggler elimination can result in extra, partial side-effect files.
> We're trying to figure out how to clean them up properly.
>
> Currently it seems that prematurely terminated reducers (due to cancled
> straggler elimination jobs) are not told they are terminated.  They just
> get a SIGPIPE because their write destination goes away.
>
> This prompted Yuri's first question:
>
> >>>> 1. Is there an easy way to tell in a script launched by the Hadoop
> >>>>    streaming, if the script was terminated before it received
> complete
> >>>>    input?
>
> To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so
> they can catch and cleanup properly.  Otherwise there seems to be no
> clean way to distinguish a half-run job from a fully run job that
> happens to have less input.  (I.e., no way for our reducer to do a
> commit or abort properly.)
>
> (It would be nicer to send an in-band termination signal down stdin, but
> I don't think a streaming reducer can do that.)
>
> So what do the Hadoop architects think about side-effects and recovering
> from half-run jobs?  Does hadoop intend to support side-effects (for
> interested users, obviously not as standard practice)?  If we were in
> Java would we get a signal we could use to do cleanup?
>
> What do that Hadoop streaming people think?  Is this just a bug that
> streaming is not propagating a signal that appears in Javaland?
>
>
>
> There's a second way, which is where most of the discussion has gone,
> call it the "proper" way:
>
> Rather than writing files as side-effects, the argument is to just
> output the data with the standard hadoop mechanism.  In streaming, this
> means through stdout.
>
> Which prompted Yuri's second question:
> >>>> 2. Is there any good way to write multiple HDFS files from a
> streaming
> >>>> script
> >>>>    *and have Hadoop cleanup those files* when it decides to destroy
> >>> the
> >>>>    task?  If there was just one file, I could simply use STDOUT, but
> >>>> dumping
> >>>>    multiple binary files to STDOUT is not pretty.
>
> But I actually think this is not viable for us,
> because we're writing images which are binary.
> As per Doug's comment:
>
> >If that isn't valid for your output, then you may have a program that is
> >only accidentally working by finding line boundaries in binary data.
>
> (Doug, we're not doing it this way right now.)
>
> That said, if it worked, this way is clearly a lot cleaner, since Hadoop
> already handles commit/abort for half-run jobs.  Basically all of our
> half-run problems go away.  But they're replaced with File Formatt
> Problems.
>
> If we were in Java, we could write our own OutputRecord class.  This is
> what
> Runping suggested and Yuri was discussing.  I don't think that works for
> us (because we're not in Java, although I suppose it might be made to
> work).
>
> If we go that way, then we're basically packing many files into one.
> To me it seems to me cleanest, if one wants to do that, to use some
> existing format, like tar or zip or cpio, or maybe the hadoop multi-file
> format.  But this way seems fraught with peril, since we have to fight
> streaming and custom record output, and then still extract the files
> after output completes anyway.  Lots and lots of work---it feels like
> this can't be right.
>
> (Another one hacky way to make this work in streaming is to convert binary
> to
> ascii, like base-64-ize the files.  Been there in SQL.  Done that.
> Don't want to do it again.  It still has all the encoding and
> post-processing junk. :-)
>
>
>
> Yuri had a very clever hack that merges the two schemes.  He writes to
> random filenames as side-effects, but then writes the side-effect
> filenames as hadoop output.  Therefore Hadoop handles commit/abort, and
> post run he just collects the files that appear in Hadoop's part-*
> output and discards the others.
>
> This hack works, but IMHO the reducer should do the commit/abort of
> side-effects, not some post-processing job.
>
>
> So any thoughts about supporting side-effects?
>
>
>    -John
>

Re: writing output files in hadoop streaming

Posted by John Heidemann <jo...@isi.edu>.
On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote: 
>
>Regarding the race condition, hadoop builds task specific temporary
>directories in the output directory, one per reduce task, that hold these
>output files (as long as you don't use absolute path names).  When the
>process completes successfully, the output files from that temporary
>directory are moved to the correct place and the temporary task-specific
>directory is deleted.  If the reduce task dies or is superceded by another
>task, then the directory is simply deleted.  The file is not kept in memory
>pending write.
>
>I am curious about how to demarcate the image boundaries in your current
>output.  Hadoop streaming makes the strong presumption of line orientation.
>If that isn't valid for your output, then you may have a program that is
>only accidentally working by finding line boundaries in binary data.  In
>particular, you may someday have a situation where some of the data has one
>kind of line boundary that is recognized, but on output the corresponding
>boundary is generated in a different form.  For instance, if your program
>sees CR-LF, it might take the pair as a line boundary and emit just LF.
>Even if this is not happening now, you may be in for some trouble
>later.

I think Yuri left out a bit about what we're doing.
He wasn't clear about what files we're talking about writing.
Let me try to clarify.

As context, all this is in Hadoop streaming.

Here's one way, the "side-effect way" (this is what we're doing now):

In principle, we'd like to not ouptut ANYTHING to stdout from streaming.
Instead, we create new files somewhere in the shared Unix filespace.
Basically, these files are side-effects of the map/reduce computation.

This approach is described in Dean & Ghemawat section 4.5
(Side-effects), with the caveat that the user must be responsible for
making any side-effect atomic.

Our problem is, I think, that duplicated reducers scheduled for
straggler elimination can result in extra, partial side-effect files.
We're trying to figure out how to clean them up properly.

Currently it seems that prematurely terminated reducers (due to cancled
straggler elimination jobs) are not told they are terminated.  They just
get a SIGPIPE because their write destination goes away.

This prompted Yuri's first question:

>>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>>    streaming, if the script was terminated before it received complete
>>>>    input?

To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so
they can catch and cleanup properly.  Otherwise there seems to be no
clean way to distinguish a half-run job from a fully run job that
happens to have less input.  (I.e., no way for our reducer to do a
commit or abort properly.)

(It would be nicer to send an in-band termination signal down stdin, but
I don't think a streaming reducer can do that.)

So what do the Hadoop architects think about side-effects and recovering
from half-run jobs?  Does hadoop intend to support side-effects (for
interested users, obviously not as standard practice)?  If we were in
Java would we get a signal we could use to do cleanup?

What do that Hadoop streaming people think?  Is this just a bug that
streaming is not propagating a signal that appears in Javaland?



There's a second way, which is where most of the discussion has gone,
call it the "proper" way:

Rather than writing files as side-effects, the argument is to just
output the data with the standard hadoop mechanism.  In streaming, this
means through stdout.

Which prompted Yuri's second question:
>>>> 2. Is there any good way to write multiple HDFS files from a streaming
>>>> script
>>>>    *and have Hadoop cleanup those files* when it decides to destroy
>>> the
>>>>    task?  If there was just one file, I could simply use STDOUT, but
>>>> dumping
>>>>    multiple binary files to STDOUT is not pretty.

But I actually think this is not viable for us,
because we're writing images which are binary.
As per Doug's comment:

>If that isn't valid for your output, then you may have a program that is
>only accidentally working by finding line boundaries in binary data. 

(Doug, we're not doing it this way right now.)

That said, if it worked, this way is clearly a lot cleaner, since Hadoop
already handles commit/abort for half-run jobs.  Basically all of our
half-run problems go away.  But they're replaced with File Formatt
Problems.

If we were in Java, we could write our own OutputRecord class.  This is what
Runping suggested and Yuri was discussing.  I don't think that works for
us (because we're not in Java, although I suppose it might be made to
work).

If we go that way, then we're basically packing many files into one.
To me it seems to me cleanest, if one wants to do that, to use some
existing format, like tar or zip or cpio, or maybe the hadoop multi-file
format.  But this way seems fraught with peril, since we have to fight
streaming and custom record output, and then still extract the files
after output completes anyway.  Lots and lots of work---it feels like
this can't be right.

(Another one hacky way to make this work in streaming is to convert binary to
ascii, like base-64-ize the files.  Been there in SQL.  Done that.
Don't want to do it again.  It still has all the encoding and
post-processing junk. :-)


	
Yuri had a very clever hack that merges the two schemes.  He writes to
random filenames as side-effects, but then writes the side-effect
filenames as hadoop output.  Therefore Hadoop handles commit/abort, and
post run he just collects the files that appear in Hadoop's part-*
output and discards the others.

This hack works, but IMHO the reducer should do the commit/abort of
side-effects, not some post-processing job.


So any thoughts about supporting side-effects?


   -John

Re: writing output files in hadoop streaming

Posted by Ted Dunning <td...@veoh.com>.
Regarding the race condition, hadoop builds task specific temporary
directories in the output directory, one per reduce task, that hold these
output files (as long as you don't use absolute path names).  When the
process completes successfully, the output files from that temporary
directory are moved to the correct place and the temporary task-specific
directory is deleted.  If the reduce task dies or is superceded by another
task, then the directory is simply deleted.  The file is not kept in memory
pending write.

I am curious about how to demarcate the image boundaries in your current
output.  Hadoop streaming makes the strong presumption of line orientation.
If that isn't valid for your output, then you may have a program that is
only accidentally working by finding line boundaries in binary data.  In
particular, you may someday have a situation where some of the data has one
kind of line boundary that is recognized, but on output the corresponding
boundary is generated in a different form.  For instance, if your program
sees CR-LF, it might take the pair as a line boundary and emit just LF.
Even if this is not happening now, you may be in for some trouble later.


On 1/15/08 8:57 AM, "Yuri Pradkin" <yu...@ISI.EDU> wrote:

> Well, in our case the reducer munches key-value pairs to
> generate images; it's conceivable that we'll have other
> reducers in the future to do other interesting things.
> So, it would be impractical to move all that code into
> a RecordWriter.  We don't want to have a new  RecordWriter
> for each new job, and we'd like to keep our processing
> code in languages other than Java, which is the only reason
> to use streaming, right?
> 
> 
> Do you think it would be a good solution to come up with a
> "generic" version of a record writer that would take as
> input, say: 
> <filename, filesize, rawbytes[filesize]>
> and do the actual writing?
> 
> Will the Hadoop guarantee that only one "filename" will be
> created/written to even if there are racing tasks and the
> file will not be corrupted?
> 
> And what about memory requirements? -- When filesize is large,
> would it have to be all stored in memory before it's written,
> or Hadoop will cache it in a temp file?
> 
> Thanks much for your input.
> 
>   -Yuri 
> 
> On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping Qi wrote:
>> 
>> One way to achieve your goal is to implement your own
>> OutputFormat/RecordWriter classes.
>> Your reducer will emit all the key/value pairs as in the normal case.
>> In your record writer class can open multiple output files and dispatch
>> the key/value to appropriate files based on the actual values.
>> This way, the Hadoop framework takes care of all the issues related the
>> namespace and the necessary cleanup of the output files.
>> 
>> 
>> Runping
>>  
>> 
>>> -----Original Message-----
>>> From: Yuri Pradkin [mailto:yuri@ISI.EDU]
>>> Sent: Monday, January 14, 2008 12:33 PM
>>> To: hadoop-user@lucene.apache.org
>>> Subject: writing output files in hadoop streaming
>>> 
>>> Hi,
>>> 
>>> We've been using Hadoop streaming for the last 3-4 months and
>>> it all worked out fine except for one little problem:
>>> 
>>> in some situations a hadoop reduce job gets multiple key groups
>>> and is desired to write out a separate binary output file for
>>> each group.  However, when a reduce task takes too long and
>>> there is spare capacity, the task may be replicated on another
>>> node and these two are basically racing each other.  One finishes
>>> cleanly and the other is terminated.  Hadoop takes care to remove
>>> ther terminated job's output from HDFS, but since we're writing
>>> files from scripts, it's up to us to separate the output of cleanly
>>> finished tasks from the output of tasks that are terminated
>>> prematurely.
>>> 
>>> Does somebody have answers to the following questions:
>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>    streaming, if the script was terminated before it received complete
>>>    input?
>>>    As far as I was able to ascertain, no signals are being sent to
>> those
>>>    unix-jobs.  They just stop receiving data from STDIN.  The only way
>>>    that seems to work for me was to process all input and then write
>>>    something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
>>>    this is ugly, I hope there is a better solution.
>>> 
>>> 2. Is there any good way to write multiple HDFS files from a streaming
>>> script
>>>    *and have Hadoop cleanup those files* when it decides to destroy
>> the
>>>    task?  If there was just one file, I could simply use STDOUT, but
>>> dumping
>>>    multiple binary files to STDOUT is not pretty.
>>> 
>>> We are writing output files to an NFS partition shared among all
>> reducers,
>>> which
>>> makes it all slightly more complicated because of possible file
>>> overwrites.
>>> 
>>> Our current solution, which is not pretty but avoids directly
>> addressing
>>> this
>>> problem is to write out files with random names (created with mktemp)
>> and
>>> write
>>> to STDOUT the renaming command for this file to it's desired name.
>> Then
>>> as a
>>> post-processing stage, I execute all those commands and delete the
>>> remaining
>>> temporary files as duplicates/incompletes.
>>> 
>>> Thanks,
>>> 
>>>   -Yuri


Re: writing output files in hadoop streaming

Posted by Yuri Pradkin <yu...@ISI.EDU>.
Well, in our case the reducer munches key-value pairs to
generate images; it's conceivable that we'll have other
reducers in the future to do other interesting things.
So, it would be impractical to move all that code into
a RecordWriter.  We don't want to have a new  RecordWriter 
for each new job, and we'd like to keep our processing 
code in languages other than Java, which is the only reason
to use streaming, right?


Do you think it would be a good solution to come up with a 
"generic" version of a record writer that would take as 
input, say: 
	<filename, filesize, rawbytes[filesize]> 
and do the actual writing?

Will the Hadoop guarantee that only one "filename" will be 
created/written to even if there are racing tasks and the 
file will not be corrupted?

And what about memory requirements? -- When filesize is large,
would it have to be all stored in memory before it's written,
or Hadoop will cache it in a temp file?

Thanks much for your input.

  -Yuri 

On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping Qi wrote:
> 
> One way to achieve your goal is to implement your own
> OutputFormat/RecordWriter classes. 
> Your reducer will emit all the key/value pairs as in the normal case.
> In your record writer class can open multiple output files and dispatch
> the key/value to appropriate files based on the actual values.
> This way, the Hadoop framework takes care of all the issues related the
> namespace and the necessary cleanup of the output files.
> 
> 
> Runping
>  
> 
> > -----Original Message-----
> > From: Yuri Pradkin [mailto:yuri@ISI.EDU]
> > Sent: Monday, January 14, 2008 12:33 PM
> > To: hadoop-user@lucene.apache.org
> > Subject: writing output files in hadoop streaming
> > 
> > Hi,
> > 
> > We've been using Hadoop streaming for the last 3-4 months and
> > it all worked out fine except for one little problem:
> > 
> > in some situations a hadoop reduce job gets multiple key groups
> > and is desired to write out a separate binary output file for
> > each group.  However, when a reduce task takes too long and
> > there is spare capacity, the task may be replicated on another
> > node and these two are basically racing each other.  One finishes
> > cleanly and the other is terminated.  Hadoop takes care to remove
> > ther terminated job's output from HDFS, but since we're writing
> > files from scripts, it's up to us to separate the output of cleanly
> > finished tasks from the output of tasks that are terminated
> > prematurely.
> > 
> > Does somebody have answers to the following questions:
> > 1. Is there an easy way to tell in a script launched by the Hadoop
> >    streaming, if the script was terminated before it received complete
> >    input?
> >    As far as I was able to ascertain, no signals are being sent to
> those
> >    unix-jobs.  They just stop receiving data from STDIN.  The only way
> >    that seems to work for me was to process all input and then write
> >    something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
> >    this is ugly, I hope there is a better solution.
> > 
> > 2. Is there any good way to write multiple HDFS files from a streaming
> > script
> >    *and have Hadoop cleanup those files* when it decides to destroy
> the
> >    task?  If there was just one file, I could simply use STDOUT, but
> > dumping
> >    multiple binary files to STDOUT is not pretty.
> > 
> > We are writing output files to an NFS partition shared among all
> reducers,
> > which
> > makes it all slightly more complicated because of possible file
> > overwrites.
> > 
> > Our current solution, which is not pretty but avoids directly
> addressing
> > this
> > problem is to write out files with random names (created with mktemp)
> and
> > write
> > to STDOUT the renaming command for this file to it's desired name.
> Then
> > as a
> > post-processing stage, I execute all those commands and delete the
> > remaining
> > temporary files as duplicates/incompletes.
> > 
> > Thanks,
> > 
> >   -Yuri

RE: writing output files in hadoop streaming

Posted by Runping Qi <ru...@YAHOO-INC.COM>.
One way to achieve your goal is to implement your own
OutputFormat/RecordWriter classes. 
Your reducer will emit all the key/value pairs as in the normal case.
In your record writer class can open multiple output files and dispatch
the key/value to appropriate files based on the actual values.
This way, the Hadoop framework takes care of all the issues related the
namespace and the necessary cleanup of the output files.


Runping
 

> -----Original Message-----
> From: Yuri Pradkin [mailto:yuri@ISI.EDU]
> Sent: Monday, January 14, 2008 12:33 PM
> To: hadoop-user@lucene.apache.org
> Subject: writing output files in hadoop streaming
> 
> Hi,
> 
> We've been using Hadoop streaming for the last 3-4 months and
> it all worked out fine except for one little problem:
> 
> in some situations a hadoop reduce job gets multiple key groups
> and is desired to write out a separate binary output file for
> each group.  However, when a reduce task takes too long and
> there is spare capacity, the task may be replicated on another
> node and these two are basically racing each other.  One finishes
> cleanly and the other is terminated.  Hadoop takes care to remove
> ther terminated job's output from HDFS, but since we're writing
> files from scripts, it's up to us to separate the output of cleanly
> finished tasks from the output of tasks that are terminated
> prematurely.
> 
> Does somebody have answers to the following questions:
> 1. Is there an easy way to tell in a script launched by the Hadoop
>    streaming, if the script was terminated before it received complete
>    input?
>    As far as I was able to ascertain, no signals are being sent to
those
>    unix-jobs.  They just stop receiving data from STDIN.  The only way
>    that seems to work for me was to process all input and then write
>    something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
>    this is ugly, I hope there is a better solution.
> 
> 2. Is there any good way to write multiple HDFS files from a streaming
> script
>    *and have Hadoop cleanup those files* when it decides to destroy
the
>    task?  If there was just one file, I could simply use STDOUT, but
> dumping
>    multiple binary files to STDOUT is not pretty.
> 
> We are writing output files to an NFS partition shared among all
reducers,
> which
> makes it all slightly more complicated because of possible file
> overwrites.
> 
> Our current solution, which is not pretty but avoids directly
addressing
> this
> problem is to write out files with random names (created with mktemp)
and
> write
> to STDOUT the renaming command for this file to it's desired name.
Then
> as a
> post-processing stage, I execute all those commands and delete the
> remaining
> temporary files as duplicates/incompletes.
> 
> Thanks,
> 
>   -Yuri