You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Jay Hacker (JIRA)" <ji...@apache.org> on 2013/02/21 16:44:12 UTC

[jira] [Updated] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming

     [ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Hacker updated MAPREDUCE-5018:
----------------------------------

    Target Version/s: trunk
        Release Note: Add "-io justbytes" I/O format to allow raw binary streaming.
              Status: Patch Available  (was: Open)

This patch adds a 'JustBytesWritable' and supporting InputFormat, OutputFormat, InputWriter, and OutputReader to support passing raw, unmodified, unaugmented bytes through Hadoop streaming.  The purpose is to be able to run arbitrary Unix filters on entire binary files stored in HDFS as map-only jobs, taking advantage of locality and reliability offered by Hadoop.

The code is very straightforward; most methods are only one line.

A few design notes:

1. Data is stored in a JustBytesWritable, which is the simplest possible Writable wrapper around a byte[].  It literally just reads until the buffer is full or EOF and remembers the number of bytes.

2. Data is read by JustBytesInputFormat in 64K chunks by default and stored in a JustBytesWritable key; the value is a NullWritable, but no value is ever read or written.  They key is used instead of the value to allow the possibility of using it in a reduce.

3. Input files are never split, as most programs are not able to handle splits.

4. Input files are not decompressed, as the purpose is to get raw data to a program, people may want to operate on compressed data (e.g., md5sum on archives), and as most tools do not expect automatic decompression, this is the "least surprising" option.  It's also trivial to throw a "zcat" in front of your filter.

5. Output is even simpler than input, and just writes the bytes of a JustBytesWritable key to the output stream.  Output is never compressed, for similar reasons as above.

6. The code uses the old mapred API, as that is what streaming uses.

Streaming inserts an InputWriter between the InputFormat and the map executable, and an OutputReader between the map executable and the OutputFormat; the JustBytes version simply pass the key bytes on through.

I've augmented IdentifierResolver to recognize "-io justbytes" on the command line and set the input/output classes appropriately.

I've included a shell script called "mapstream" to run streaming with all required command line parameters; it makes running a binary map-only job as easy as:

    mapstream indir command outdir

which runs "command" on every file in indir and writes the results to outdir.

I welcome feedback, especially if there is an even simpler way to do this.  I'm not hung up on the JustBytes name, I'd be happy to switch to a better one.  If people like the general approach, I will add unit tests and resubmit.  Also please let me know if I should break this into separate patches for common and mapreduce.
                
> Support raw binary data with Hadoop streaming
> ---------------------------------------------
>
>                 Key: MAPREDUCE-5018
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: contrib/streaming
>            Reporter: Jay Hacker
>            Priority: Minor
>
> People often have a need to run older programs over many files, and turn to Hadoop streaming as a reliable, performant batch system.  There are good reasons for this:
> 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and it is easy to spin up a cluster in the cloud.
> 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs.
> 3. It is reasonably performant: it moves the code to the data, maintaining locality, and scales with the number of nodes.
> Historically Hadoop is of course oriented toward processing key/value pairs, and so needs to interpret the data passing through it.  Unfortunately, this makes it difficult to use Hadoop streaming with programs that don't deal in key/value pairs, or with binary data in general.  For example, something as simple as running md5sum to verify the integrity of files will not give the correct result, due to Hadoop's interpretation of the data.  
> There have been several attempts at binary serialization schemes for Hadoop streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed at efficiently encoding key/value pairs, and not passing data through unmodified.  Even the "RawBytes" serialization scheme adds length fields to the data, rendering it not-so-raw.
> I often have a need to run a Unix filter on files stored in HDFS; currently, the only way I can do this on the raw data is to copy the data out and run the filter on one machine, which is inconvenient, slow, and unreliable.  It would be very convenient to run the filter as a map-only job, allowing me to build on existing (well-tested!) building blocks in the Unix tradition instead of reimplementing them as mapreduce programs.
> However, most existing tools don't know about file splits, and so want to process whole files; and of course many expect raw binary input and output.  The solution is to run a map-only job with an InputFormat and OutputFormat that just pass raw bytes and don't split.  It turns out to be a little more complicated with streaming; I have attached a patch with the simplest solution I could come up with.  I call the format "JustBytes" (as "RawBytes" was already taken), and it should be usable with most recent versions of Hadoop.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira