You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Christopher Piggott <cp...@gmail.com> on 2013/02/10 16:36:24 UTC

Confused about splitting

I'm a little confused about splitting and readers.

The data in my application is stored in files of google protocol buffers.
 There are multiple protocol buffers per file.  There have been a number of
simple ways to put multiple protobufs in a single file, usually involving
writing some kind of length field before.  We did something a little more
complicated by defining a frame similar to HDLC: frames are enveloped by a
flag, escapes provided so the flag can't occur within the frame; and there
is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader
that's something like this:

   public interface RitRecordReader {
      RitRecord getNext();
    }


The data collection appication stores these things in ordinary flat files
(the whole thing is run through a GzipOutputFilter first, so the files are
compressed).  I'm having trouble understanding how to best apply this to
HDFS for map function consumption.  Our data collector writes 1 megabyte
files, but I can combine them for map/reduce performance.  To avoid TOO
much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple
HDFS blocks.  I think I end up with problems similar to this guy:

    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in
another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
blocks then I'm going to end up with a problem.  It's not yet clear to me
how to solve this.  I could make the problem LESS likely with bigger blocks
(like the default 128MB) but even then, the problem doesn't completely go
away (for me, a >128MB file is unlikely but not impossible).

--Chris

Re: Confused about splitting

Posted by Michael Katzenellenbogen <mi...@cloudera.com>.
Have your record reader seek (and discard data) until it finds the
beginning of your frame, and always keep reading until the end/length of
the current frame/record. This will assure you that if your PB record is
split between blocks that you will read them all.

For example, suppose you end up with 2 mappers, and 3 PB records across 2
HDFS splits, then mapper 1 will read one record immediately (it sees the
frame market right away), and continue reading a second record up until its
end / length -- even into the next block. Mapper 2 will begin reading its
block, and begin looking for a frame marker, discarding data up until the
marker; effectively reading in one block once it finds the frame marker.
Between both mappers you've now read all 3 blocks.

HTH,
-Michael

On Feb 10, 2013, at 10:36 AM, Christopher Piggott <cp...@gmail.com>
wrote:

I'm a little confused about splitting and readers.

The data in my application is stored in files of google protocol buffers.
 There are multiple protocol buffers per file.  There have been a number of
simple ways to put multiple protobufs in a single file, usually involving
writing some kind of length field before.  We did something a little more
complicated by defining a frame similar to HDLC: frames are enveloped by a
flag, escapes provided so the flag can't occur within the frame; and there
is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader
that's something like this:

   public interface RitRecordReader {
      RitRecord getNext();
    }


The data collection appication stores these things in ordinary flat files
(the whole thing is run through a GzipOutputFilter first, so the files are
compressed).  I'm having trouble understanding how to best apply this to
HDFS for map function consumption.  Our data collector writes 1 megabyte
files, but I can combine them for map/reduce performance.  To avoid TOO
much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple
HDFS blocks.  I think I end up with problems similar to this guy:

    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in
another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
blocks then I'm going to end up with a problem.  It's not yet clear to me
how to solve this.  I could make the problem LESS likely with bigger blocks
(like the default 128MB) but even then, the problem doesn't completely go
away (for me, a >128MB file is unlikely but not impossible).

--Chris

Re: Confused about splitting

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Adding on to the response, looking at the existing source code of
LineRecordReader, which has a similar function to read across HDFS blocks
to align with line boundaries may also help you to write similar code.
Harsh had responded with more specific details as to where to look on the
list before. For e.g.: http://search-hadoop.com/?q=question+about+file+split

Thanks
Hemanth


On Sun, Feb 10, 2013 at 10:02 PM, java8964 java8964 <ja...@hotmail.com>wrote:

>  Hi, Chris:
>
> Here is my understand about the file split and Data block.
>
> The HDFS will store your file into multi data blocks, each block will be
> 64M or 128M depend on your setting. Of course, the file could contain multi
> records. So the boundary of the record won't match with the block boundary
> (in fact, most of them don't match).
> It is the responsibility of RecorderReader to figure that out. The
> RecorderReader will be given byte[] of the file split (or block) it should
> handle, and most likely the end of this block won't BE an end of Record. So
> when the RecorderReader read the end of block, it will ALSO continue to the
> first part of byte[] of next block, to build up a whole recorder of last
> one. Based on this contract, the RecorderReader instance which handles the
> next block, will ignore the first part of byte[], as they are just part of
> a previous recorder, and go straight to the starting point of next Record.
>
> The above logic is all based on assuming that the file is split-able. I
> did a project with the log file could contain "embedded newline
> characters", so the TextInputFormat/LineRecorderReader coming from Hadoop
> won't work in this case, and I have to write my own
> InputFormat/RecorderReader to handle the above logic. To make
> File/InputFormat/RecorderReader support split-able is important for
> performance, as the data can be processed concurrently block by block. But
> some file format, especially compressing formats, like GZIP, do not support
> file split-able. In this case, each file can ONLY be handle by one mapper.
> If you want to store your data into Gzip format, maybe you want to control
> your file size, make it close to the block size.
>
> For data stored in google protocol buffer, you probably have to write your
> own InputFormat/RecorderReader to make it split-able. You can consider LZO
> format, as it is compressing and also support split. You can search the
> elephant-bird, which is a framework from twitter to support google protocol
> buffer and lzo data format, make your life easier.
>
> Thanks
>
> Yong
>
> ------------------------------
> Date: Sun, 10 Feb 2013 10:36:24 -0500
> Subject: Confused about splitting
> From: cpiggott@gmail.com
> To: user@hadoop.apache.org
>
>
> I'm a little confused about splitting and readers.
>
> The data in my application is stored in files of google protocol buffers.
>  There are multiple protocol buffers per file.  There have been a number of
> simple ways to put multiple protobufs in a single file, usually involving
> writing some kind of length field before.  We did something a little more
> complicated by defining a frame similar to HDLC: frames are enveloped by a
> flag, escapes provided so the flag can't occur within the frame; and there
> is a 32-bit CRC-like checksum just before the closing flag.
>
> The protobufs are all a type named RitRecord, and we have our own reader
> that's something like this:
>
>    public interface RitRecordReader {
>       RitRecord getNext();
>     }
>
>
> The data collection appication stores these things in ordinary flat files
> (the whole thing is run through a GzipOutputFilter first, so the files are
> compressed).  I'm having trouble understanding how to best apply this to
> HDFS for map function consumption.  Our data collector writes 1 megabyte
> files, but I can combine them for map/reduce performance.  To avoid TOO
> much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).
>
> What I don't get is this: suppose we have a long file that spans multiple
> HDFS blocks.  I think I end up with problems similar to this guy:
>
>     http://blog.rguha.net/?p=293
>
> where one of my RitRecord objects is half in one HDFS block and half in
> another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
> blocks then I'm going to end up with a problem.  It's not yet clear to me
> how to solve this.  I could make the problem LESS likely with bigger blocks
> (like the default 128MB) but even then, the problem doesn't completely go
> away (for me, a >128MB file is unlikely but not impossible).
>
> --Chris
>
>

Re: Confused about splitting

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Adding on to the response, looking at the existing source code of
LineRecordReader, which has a similar function to read across HDFS blocks
to align with line boundaries may also help you to write similar code.
Harsh had responded with more specific details as to where to look on the
list before. For e.g.: http://search-hadoop.com/?q=question+about+file+split

Thanks
Hemanth


On Sun, Feb 10, 2013 at 10:02 PM, java8964 java8964 <ja...@hotmail.com>wrote:

>  Hi, Chris:
>
> Here is my understand about the file split and Data block.
>
> The HDFS will store your file into multi data blocks, each block will be
> 64M or 128M depend on your setting. Of course, the file could contain multi
> records. So the boundary of the record won't match with the block boundary
> (in fact, most of them don't match).
> It is the responsibility of RecorderReader to figure that out. The
> RecorderReader will be given byte[] of the file split (or block) it should
> handle, and most likely the end of this block won't BE an end of Record. So
> when the RecorderReader read the end of block, it will ALSO continue to the
> first part of byte[] of next block, to build up a whole recorder of last
> one. Based on this contract, the RecorderReader instance which handles the
> next block, will ignore the first part of byte[], as they are just part of
> a previous recorder, and go straight to the starting point of next Record.
>
> The above logic is all based on assuming that the file is split-able. I
> did a project with the log file could contain "embedded newline
> characters", so the TextInputFormat/LineRecorderReader coming from Hadoop
> won't work in this case, and I have to write my own
> InputFormat/RecorderReader to handle the above logic. To make
> File/InputFormat/RecorderReader support split-able is important for
> performance, as the data can be processed concurrently block by block. But
> some file format, especially compressing formats, like GZIP, do not support
> file split-able. In this case, each file can ONLY be handle by one mapper.
> If you want to store your data into Gzip format, maybe you want to control
> your file size, make it close to the block size.
>
> For data stored in google protocol buffer, you probably have to write your
> own InputFormat/RecorderReader to make it split-able. You can consider LZO
> format, as it is compressing and also support split. You can search the
> elephant-bird, which is a framework from twitter to support google protocol
> buffer and lzo data format, make your life easier.
>
> Thanks
>
> Yong
>
> ------------------------------
> Date: Sun, 10 Feb 2013 10:36:24 -0500
> Subject: Confused about splitting
> From: cpiggott@gmail.com
> To: user@hadoop.apache.org
>
>
> I'm a little confused about splitting and readers.
>
> The data in my application is stored in files of google protocol buffers.
>  There are multiple protocol buffers per file.  There have been a number of
> simple ways to put multiple protobufs in a single file, usually involving
> writing some kind of length field before.  We did something a little more
> complicated by defining a frame similar to HDLC: frames are enveloped by a
> flag, escapes provided so the flag can't occur within the frame; and there
> is a 32-bit CRC-like checksum just before the closing flag.
>
> The protobufs are all a type named RitRecord, and we have our own reader
> that's something like this:
>
>    public interface RitRecordReader {
>       RitRecord getNext();
>     }
>
>
> The data collection appication stores these things in ordinary flat files
> (the whole thing is run through a GzipOutputFilter first, so the files are
> compressed).  I'm having trouble understanding how to best apply this to
> HDFS for map function consumption.  Our data collector writes 1 megabyte
> files, but I can combine them for map/reduce performance.  To avoid TOO
> much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).
>
> What I don't get is this: suppose we have a long file that spans multiple
> HDFS blocks.  I think I end up with problems similar to this guy:
>
>     http://blog.rguha.net/?p=293
>
> where one of my RitRecord objects is half in one HDFS block and half in
> another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
> blocks then I'm going to end up with a problem.  It's not yet clear to me
> how to solve this.  I could make the problem LESS likely with bigger blocks
> (like the default 128MB) but even then, the problem doesn't completely go
> away (for me, a >128MB file is unlikely but not impossible).
>
> --Chris
>
>

Re: Confused about splitting

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Adding on to the response, looking at the existing source code of
LineRecordReader, which has a similar function to read across HDFS blocks
to align with line boundaries may also help you to write similar code.
Harsh had responded with more specific details as to where to look on the
list before. For e.g.: http://search-hadoop.com/?q=question+about+file+split

Thanks
Hemanth


On Sun, Feb 10, 2013 at 10:02 PM, java8964 java8964 <ja...@hotmail.com>wrote:

>  Hi, Chris:
>
> Here is my understand about the file split and Data block.
>
> The HDFS will store your file into multi data blocks, each block will be
> 64M or 128M depend on your setting. Of course, the file could contain multi
> records. So the boundary of the record won't match with the block boundary
> (in fact, most of them don't match).
> It is the responsibility of RecorderReader to figure that out. The
> RecorderReader will be given byte[] of the file split (or block) it should
> handle, and most likely the end of this block won't BE an end of Record. So
> when the RecorderReader read the end of block, it will ALSO continue to the
> first part of byte[] of next block, to build up a whole recorder of last
> one. Based on this contract, the RecorderReader instance which handles the
> next block, will ignore the first part of byte[], as they are just part of
> a previous recorder, and go straight to the starting point of next Record.
>
> The above logic is all based on assuming that the file is split-able. I
> did a project with the log file could contain "embedded newline
> characters", so the TextInputFormat/LineRecorderReader coming from Hadoop
> won't work in this case, and I have to write my own
> InputFormat/RecorderReader to handle the above logic. To make
> File/InputFormat/RecorderReader support split-able is important for
> performance, as the data can be processed concurrently block by block. But
> some file format, especially compressing formats, like GZIP, do not support
> file split-able. In this case, each file can ONLY be handle by one mapper.
> If you want to store your data into Gzip format, maybe you want to control
> your file size, make it close to the block size.
>
> For data stored in google protocol buffer, you probably have to write your
> own InputFormat/RecorderReader to make it split-able. You can consider LZO
> format, as it is compressing and also support split. You can search the
> elephant-bird, which is a framework from twitter to support google protocol
> buffer and lzo data format, make your life easier.
>
> Thanks
>
> Yong
>
> ------------------------------
> Date: Sun, 10 Feb 2013 10:36:24 -0500
> Subject: Confused about splitting
> From: cpiggott@gmail.com
> To: user@hadoop.apache.org
>
>
> I'm a little confused about splitting and readers.
>
> The data in my application is stored in files of google protocol buffers.
>  There are multiple protocol buffers per file.  There have been a number of
> simple ways to put multiple protobufs in a single file, usually involving
> writing some kind of length field before.  We did something a little more
> complicated by defining a frame similar to HDLC: frames are enveloped by a
> flag, escapes provided so the flag can't occur within the frame; and there
> is a 32-bit CRC-like checksum just before the closing flag.
>
> The protobufs are all a type named RitRecord, and we have our own reader
> that's something like this:
>
>    public interface RitRecordReader {
>       RitRecord getNext();
>     }
>
>
> The data collection appication stores these things in ordinary flat files
> (the whole thing is run through a GzipOutputFilter first, so the files are
> compressed).  I'm having trouble understanding how to best apply this to
> HDFS for map function consumption.  Our data collector writes 1 megabyte
> files, but I can combine them for map/reduce performance.  To avoid TOO
> much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).
>
> What I don't get is this: suppose we have a long file that spans multiple
> HDFS blocks.  I think I end up with problems similar to this guy:
>
>     http://blog.rguha.net/?p=293
>
> where one of my RitRecord objects is half in one HDFS block and half in
> another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
> blocks then I'm going to end up with a problem.  It's not yet clear to me
> how to solve this.  I could make the problem LESS likely with bigger blocks
> (like the default 128MB) but even then, the problem doesn't completely go
> away (for me, a >128MB file is unlikely but not impossible).
>
> --Chris
>
>

Re: Confused about splitting

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Adding on to the response, looking at the existing source code of
LineRecordReader, which has a similar function to read across HDFS blocks
to align with line boundaries may also help you to write similar code.
Harsh had responded with more specific details as to where to look on the
list before. For e.g.: http://search-hadoop.com/?q=question+about+file+split

Thanks
Hemanth


On Sun, Feb 10, 2013 at 10:02 PM, java8964 java8964 <ja...@hotmail.com>wrote:

>  Hi, Chris:
>
> Here is my understand about the file split and Data block.
>
> The HDFS will store your file into multi data blocks, each block will be
> 64M or 128M depend on your setting. Of course, the file could contain multi
> records. So the boundary of the record won't match with the block boundary
> (in fact, most of them don't match).
> It is the responsibility of RecorderReader to figure that out. The
> RecorderReader will be given byte[] of the file split (or block) it should
> handle, and most likely the end of this block won't BE an end of Record. So
> when the RecorderReader read the end of block, it will ALSO continue to the
> first part of byte[] of next block, to build up a whole recorder of last
> one. Based on this contract, the RecorderReader instance which handles the
> next block, will ignore the first part of byte[], as they are just part of
> a previous recorder, and go straight to the starting point of next Record.
>
> The above logic is all based on assuming that the file is split-able. I
> did a project with the log file could contain "embedded newline
> characters", so the TextInputFormat/LineRecorderReader coming from Hadoop
> won't work in this case, and I have to write my own
> InputFormat/RecorderReader to handle the above logic. To make
> File/InputFormat/RecorderReader support split-able is important for
> performance, as the data can be processed concurrently block by block. But
> some file format, especially compressing formats, like GZIP, do not support
> file split-able. In this case, each file can ONLY be handle by one mapper.
> If you want to store your data into Gzip format, maybe you want to control
> your file size, make it close to the block size.
>
> For data stored in google protocol buffer, you probably have to write your
> own InputFormat/RecorderReader to make it split-able. You can consider LZO
> format, as it is compressing and also support split. You can search the
> elephant-bird, which is a framework from twitter to support google protocol
> buffer and lzo data format, make your life easier.
>
> Thanks
>
> Yong
>
> ------------------------------
> Date: Sun, 10 Feb 2013 10:36:24 -0500
> Subject: Confused about splitting
> From: cpiggott@gmail.com
> To: user@hadoop.apache.org
>
>
> I'm a little confused about splitting and readers.
>
> The data in my application is stored in files of google protocol buffers.
>  There are multiple protocol buffers per file.  There have been a number of
> simple ways to put multiple protobufs in a single file, usually involving
> writing some kind of length field before.  We did something a little more
> complicated by defining a frame similar to HDLC: frames are enveloped by a
> flag, escapes provided so the flag can't occur within the frame; and there
> is a 32-bit CRC-like checksum just before the closing flag.
>
> The protobufs are all a type named RitRecord, and we have our own reader
> that's something like this:
>
>    public interface RitRecordReader {
>       RitRecord getNext();
>     }
>
>
> The data collection appication stores these things in ordinary flat files
> (the whole thing is run through a GzipOutputFilter first, so the files are
> compressed).  I'm having trouble understanding how to best apply this to
> HDFS for map function consumption.  Our data collector writes 1 megabyte
> files, but I can combine them for map/reduce performance.  To avoid TOO
> much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).
>
> What I don't get is this: suppose we have a long file that spans multiple
> HDFS blocks.  I think I end up with problems similar to this guy:
>
>     http://blog.rguha.net/?p=293
>
> where one of my RitRecord objects is half in one HDFS block and half in
> another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
> blocks then I'm going to end up with a problem.  It's not yet clear to me
> how to solve this.  I could make the problem LESS likely with bigger blocks
> (like the default 128MB) but even then, the problem doesn't completely go
> away (for me, a >128MB file is unlikely but not impossible).
>
> --Chris
>
>

RE: Confused about splitting

Posted by java8964 java8964 <ja...@hotmail.com>.
Hi, Chris:
Here is my understand about the file split and Data block.
The HDFS will store your file into multi data blocks, each block will be 64M or 128M depend on your setting. Of course, the file could contain multi records. So the boundary of the record won't match with the block boundary (in fact, most of them don't match).It is the responsibility of RecorderReader to figure that out. The RecorderReader will be given byte[] of the file split (or block) it should handle, and most likely the end of this block won't BE an end of Record. So when the RecorderReader read the end of block, it will ALSO continue to the first part of byte[] of next block, to build up a whole recorder of last one. Based on this contract, the RecorderReader instance which handles the next block, will ignore the first part of byte[], as they are just part of a previous recorder, and go straight to the starting point of next Record.
The above logic is all based on assuming that the file is split-able. I did a project with the log file could contain "embedded newline characters", so the TextInputFormat/LineRecorderReader coming from Hadoop won't work in this case, and I have to write my own InputFormat/RecorderReader to handle the above logic. To make File/InputFormat/RecorderReader support split-able is important for performance, as the data can be processed concurrently block by block. But some file format, especially compressing formats, like GZIP, do not support file split-able. In this case, each file can ONLY be handle by one mapper. If you want to store your data into Gzip format, maybe you want to control your file size, make it close to the block size.
For data stored in google protocol buffer, you probably have to write your own InputFormat/RecorderReader to make it split-able. You can consider LZO format, as it is compressing and also support split. You can search the elephant-bird, which is a framework from twitter to support google protocol buffer and lzo data format, make your life easier.
Thanks
Yong

Date: Sun, 10 Feb 2013 10:36:24 -0500
Subject: Confused about splitting
From: cpiggott@gmail.com
To: user@hadoop.apache.org

I'm a little confused about splitting and readers.
The data in my application is stored in files of google protocol buffers.  There are multiple protocol buffers per file.  There have been a number of simple ways to put multiple protobufs in a single file, usually involving writing some kind of length field before.  We did something a little more complicated by defining a frame similar to HDLC: frames are enveloped by a flag, escapes provided so the flag can't occur within the frame; and there is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader that's something like this:
   public interface RitRecordReader {      RitRecord getNext();
    }

The data collection appication stores these things in ordinary flat files (the whole thing is run through a GzipOutputFilter first, so the files are compressed).  I'm having trouble understanding how to best apply this to HDFS for map function consumption.  Our data collector writes 1 megabyte files, but I can combine them for map/reduce performance.  To avoid TOO much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple HDFS blocks.  I think I end up with problems similar to this guy:
    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in another HDFS block.  If the mapper is assigning tasks to nodes along HDFS blocks then I'm going to end up with a problem.  It's not yet clear to me how to solve this.  I could make the problem LESS likely with bigger blocks (like the default 128MB) but even then, the problem doesn't completely go away (for me, a >128MB file is unlikely but not impossible).

--Chris
 		 	   		  

RE: Confused about splitting

Posted by java8964 java8964 <ja...@hotmail.com>.
Hi, Chris:
Here is my understand about the file split and Data block.
The HDFS will store your file into multi data blocks, each block will be 64M or 128M depend on your setting. Of course, the file could contain multi records. So the boundary of the record won't match with the block boundary (in fact, most of them don't match).It is the responsibility of RecorderReader to figure that out. The RecorderReader will be given byte[] of the file split (or block) it should handle, and most likely the end of this block won't BE an end of Record. So when the RecorderReader read the end of block, it will ALSO continue to the first part of byte[] of next block, to build up a whole recorder of last one. Based on this contract, the RecorderReader instance which handles the next block, will ignore the first part of byte[], as they are just part of a previous recorder, and go straight to the starting point of next Record.
The above logic is all based on assuming that the file is split-able. I did a project with the log file could contain "embedded newline characters", so the TextInputFormat/LineRecorderReader coming from Hadoop won't work in this case, and I have to write my own InputFormat/RecorderReader to handle the above logic. To make File/InputFormat/RecorderReader support split-able is important for performance, as the data can be processed concurrently block by block. But some file format, especially compressing formats, like GZIP, do not support file split-able. In this case, each file can ONLY be handle by one mapper. If you want to store your data into Gzip format, maybe you want to control your file size, make it close to the block size.
For data stored in google protocol buffer, you probably have to write your own InputFormat/RecorderReader to make it split-able. You can consider LZO format, as it is compressing and also support split. You can search the elephant-bird, which is a framework from twitter to support google protocol buffer and lzo data format, make your life easier.
Thanks
Yong

Date: Sun, 10 Feb 2013 10:36:24 -0500
Subject: Confused about splitting
From: cpiggott@gmail.com
To: user@hadoop.apache.org

I'm a little confused about splitting and readers.
The data in my application is stored in files of google protocol buffers.  There are multiple protocol buffers per file.  There have been a number of simple ways to put multiple protobufs in a single file, usually involving writing some kind of length field before.  We did something a little more complicated by defining a frame similar to HDLC: frames are enveloped by a flag, escapes provided so the flag can't occur within the frame; and there is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader that's something like this:
   public interface RitRecordReader {      RitRecord getNext();
    }

The data collection appication stores these things in ordinary flat files (the whole thing is run through a GzipOutputFilter first, so the files are compressed).  I'm having trouble understanding how to best apply this to HDFS for map function consumption.  Our data collector writes 1 megabyte files, but I can combine them for map/reduce performance.  To avoid TOO much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple HDFS blocks.  I think I end up with problems similar to this guy:
    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in another HDFS block.  If the mapper is assigning tasks to nodes along HDFS blocks then I'm going to end up with a problem.  It's not yet clear to me how to solve this.  I could make the problem LESS likely with bigger blocks (like the default 128MB) but even then, the problem doesn't completely go away (for me, a >128MB file is unlikely but not impossible).

--Chris
 		 	   		  

Re: Confused about splitting

Posted by Michael Katzenellenbogen <mi...@cloudera.com>.
Have your record reader seek (and discard data) until it finds the
beginning of your frame, and always keep reading until the end/length of
the current frame/record. This will assure you that if your PB record is
split between blocks that you will read them all.

For example, suppose you end up with 2 mappers, and 3 PB records across 2
HDFS splits, then mapper 1 will read one record immediately (it sees the
frame market right away), and continue reading a second record up until its
end / length -- even into the next block. Mapper 2 will begin reading its
block, and begin looking for a frame marker, discarding data up until the
marker; effectively reading in one block once it finds the frame marker.
Between both mappers you've now read all 3 blocks.

HTH,
-Michael

On Feb 10, 2013, at 10:36 AM, Christopher Piggott <cp...@gmail.com>
wrote:

I'm a little confused about splitting and readers.

The data in my application is stored in files of google protocol buffers.
 There are multiple protocol buffers per file.  There have been a number of
simple ways to put multiple protobufs in a single file, usually involving
writing some kind of length field before.  We did something a little more
complicated by defining a frame similar to HDLC: frames are enveloped by a
flag, escapes provided so the flag can't occur within the frame; and there
is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader
that's something like this:

   public interface RitRecordReader {
      RitRecord getNext();
    }


The data collection appication stores these things in ordinary flat files
(the whole thing is run through a GzipOutputFilter first, so the files are
compressed).  I'm having trouble understanding how to best apply this to
HDFS for map function consumption.  Our data collector writes 1 megabyte
files, but I can combine them for map/reduce performance.  To avoid TOO
much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple
HDFS blocks.  I think I end up with problems similar to this guy:

    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in
another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
blocks then I'm going to end up with a problem.  It's not yet clear to me
how to solve this.  I could make the problem LESS likely with bigger blocks
(like the default 128MB) but even then, the problem doesn't completely go
away (for me, a >128MB file is unlikely but not impossible).

--Chris

Re: Confused about splitting

Posted by Michael Katzenellenbogen <mi...@cloudera.com>.
Have your record reader seek (and discard data) until it finds the
beginning of your frame, and always keep reading until the end/length of
the current frame/record. This will assure you that if your PB record is
split between blocks that you will read them all.

For example, suppose you end up with 2 mappers, and 3 PB records across 2
HDFS splits, then mapper 1 will read one record immediately (it sees the
frame market right away), and continue reading a second record up until its
end / length -- even into the next block. Mapper 2 will begin reading its
block, and begin looking for a frame marker, discarding data up until the
marker; effectively reading in one block once it finds the frame marker.
Between both mappers you've now read all 3 blocks.

HTH,
-Michael

On Feb 10, 2013, at 10:36 AM, Christopher Piggott <cp...@gmail.com>
wrote:

I'm a little confused about splitting and readers.

The data in my application is stored in files of google protocol buffers.
 There are multiple protocol buffers per file.  There have been a number of
simple ways to put multiple protobufs in a single file, usually involving
writing some kind of length field before.  We did something a little more
complicated by defining a frame similar to HDLC: frames are enveloped by a
flag, escapes provided so the flag can't occur within the frame; and there
is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader
that's something like this:

   public interface RitRecordReader {
      RitRecord getNext();
    }


The data collection appication stores these things in ordinary flat files
(the whole thing is run through a GzipOutputFilter first, so the files are
compressed).  I'm having trouble understanding how to best apply this to
HDFS for map function consumption.  Our data collector writes 1 megabyte
files, but I can combine them for map/reduce performance.  To avoid TOO
much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple
HDFS blocks.  I think I end up with problems similar to this guy:

    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in
another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
blocks then I'm going to end up with a problem.  It's not yet clear to me
how to solve this.  I could make the problem LESS likely with bigger blocks
(like the default 128MB) but even then, the problem doesn't completely go
away (for me, a >128MB file is unlikely but not impossible).

--Chris

Re: Confused about splitting

Posted by Michael Katzenellenbogen <mi...@cloudera.com>.
Have your record reader seek (and discard data) until it finds the
beginning of your frame, and always keep reading until the end/length of
the current frame/record. This will assure you that if your PB record is
split between blocks that you will read them all.

For example, suppose you end up with 2 mappers, and 3 PB records across 2
HDFS splits, then mapper 1 will read one record immediately (it sees the
frame market right away), and continue reading a second record up until its
end / length -- even into the next block. Mapper 2 will begin reading its
block, and begin looking for a frame marker, discarding data up until the
marker; effectively reading in one block once it finds the frame marker.
Between both mappers you've now read all 3 blocks.

HTH,
-Michael

On Feb 10, 2013, at 10:36 AM, Christopher Piggott <cp...@gmail.com>
wrote:

I'm a little confused about splitting and readers.

The data in my application is stored in files of google protocol buffers.
 There are multiple protocol buffers per file.  There have been a number of
simple ways to put multiple protobufs in a single file, usually involving
writing some kind of length field before.  We did something a little more
complicated by defining a frame similar to HDLC: frames are enveloped by a
flag, escapes provided so the flag can't occur within the frame; and there
is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader
that's something like this:

   public interface RitRecordReader {
      RitRecord getNext();
    }


The data collection appication stores these things in ordinary flat files
(the whole thing is run through a GzipOutputFilter first, so the files are
compressed).  I'm having trouble understanding how to best apply this to
HDFS for map function consumption.  Our data collector writes 1 megabyte
files, but I can combine them for map/reduce performance.  To avoid TOO
much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple
HDFS blocks.  I think I end up with problems similar to this guy:

    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in
another HDFS block.  If the mapper is assigning tasks to nodes along HDFS
blocks then I'm going to end up with a problem.  It's not yet clear to me
how to solve this.  I could make the problem LESS likely with bigger blocks
(like the default 128MB) but even then, the problem doesn't completely go
away (for me, a >128MB file is unlikely but not impossible).

--Chris

RE: Confused about splitting

Posted by java8964 java8964 <ja...@hotmail.com>.
Hi, Chris:
Here is my understand about the file split and Data block.
The HDFS will store your file into multi data blocks, each block will be 64M or 128M depend on your setting. Of course, the file could contain multi records. So the boundary of the record won't match with the block boundary (in fact, most of them don't match).It is the responsibility of RecorderReader to figure that out. The RecorderReader will be given byte[] of the file split (or block) it should handle, and most likely the end of this block won't BE an end of Record. So when the RecorderReader read the end of block, it will ALSO continue to the first part of byte[] of next block, to build up a whole recorder of last one. Based on this contract, the RecorderReader instance which handles the next block, will ignore the first part of byte[], as they are just part of a previous recorder, and go straight to the starting point of next Record.
The above logic is all based on assuming that the file is split-able. I did a project with the log file could contain "embedded newline characters", so the TextInputFormat/LineRecorderReader coming from Hadoop won't work in this case, and I have to write my own InputFormat/RecorderReader to handle the above logic. To make File/InputFormat/RecorderReader support split-able is important for performance, as the data can be processed concurrently block by block. But some file format, especially compressing formats, like GZIP, do not support file split-able. In this case, each file can ONLY be handle by one mapper. If you want to store your data into Gzip format, maybe you want to control your file size, make it close to the block size.
For data stored in google protocol buffer, you probably have to write your own InputFormat/RecorderReader to make it split-able. You can consider LZO format, as it is compressing and also support split. You can search the elephant-bird, which is a framework from twitter to support google protocol buffer and lzo data format, make your life easier.
Thanks
Yong

Date: Sun, 10 Feb 2013 10:36:24 -0500
Subject: Confused about splitting
From: cpiggott@gmail.com
To: user@hadoop.apache.org

I'm a little confused about splitting and readers.
The data in my application is stored in files of google protocol buffers.  There are multiple protocol buffers per file.  There have been a number of simple ways to put multiple protobufs in a single file, usually involving writing some kind of length field before.  We did something a little more complicated by defining a frame similar to HDLC: frames are enveloped by a flag, escapes provided so the flag can't occur within the frame; and there is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader that's something like this:
   public interface RitRecordReader {      RitRecord getNext();
    }

The data collection appication stores these things in ordinary flat files (the whole thing is run through a GzipOutputFilter first, so the files are compressed).  I'm having trouble understanding how to best apply this to HDFS for map function consumption.  Our data collector writes 1 megabyte files, but I can combine them for map/reduce performance.  To avoid TOO much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple HDFS blocks.  I think I end up with problems similar to this guy:
    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in another HDFS block.  If the mapper is assigning tasks to nodes along HDFS blocks then I'm going to end up with a problem.  It's not yet clear to me how to solve this.  I could make the problem LESS likely with bigger blocks (like the default 128MB) but even then, the problem doesn't completely go away (for me, a >128MB file is unlikely but not impossible).

--Chris
 		 	   		  

RE: Confused about splitting

Posted by java8964 java8964 <ja...@hotmail.com>.
Hi, Chris:
Here is my understand about the file split and Data block.
The HDFS will store your file into multi data blocks, each block will be 64M or 128M depend on your setting. Of course, the file could contain multi records. So the boundary of the record won't match with the block boundary (in fact, most of them don't match).It is the responsibility of RecorderReader to figure that out. The RecorderReader will be given byte[] of the file split (or block) it should handle, and most likely the end of this block won't BE an end of Record. So when the RecorderReader read the end of block, it will ALSO continue to the first part of byte[] of next block, to build up a whole recorder of last one. Based on this contract, the RecorderReader instance which handles the next block, will ignore the first part of byte[], as they are just part of a previous recorder, and go straight to the starting point of next Record.
The above logic is all based on assuming that the file is split-able. I did a project with the log file could contain "embedded newline characters", so the TextInputFormat/LineRecorderReader coming from Hadoop won't work in this case, and I have to write my own InputFormat/RecorderReader to handle the above logic. To make File/InputFormat/RecorderReader support split-able is important for performance, as the data can be processed concurrently block by block. But some file format, especially compressing formats, like GZIP, do not support file split-able. In this case, each file can ONLY be handle by one mapper. If you want to store your data into Gzip format, maybe you want to control your file size, make it close to the block size.
For data stored in google protocol buffer, you probably have to write your own InputFormat/RecorderReader to make it split-able. You can consider LZO format, as it is compressing and also support split. You can search the elephant-bird, which is a framework from twitter to support google protocol buffer and lzo data format, make your life easier.
Thanks
Yong

Date: Sun, 10 Feb 2013 10:36:24 -0500
Subject: Confused about splitting
From: cpiggott@gmail.com
To: user@hadoop.apache.org

I'm a little confused about splitting and readers.
The data in my application is stored in files of google protocol buffers.  There are multiple protocol buffers per file.  There have been a number of simple ways to put multiple protobufs in a single file, usually involving writing some kind of length field before.  We did something a little more complicated by defining a frame similar to HDLC: frames are enveloped by a flag, escapes provided so the flag can't occur within the frame; and there is a 32-bit CRC-like checksum just before the closing flag.

The protobufs are all a type named RitRecord, and we have our own reader that's something like this:
   public interface RitRecordReader {      RitRecord getNext();
    }

The data collection appication stores these things in ordinary flat files (the whole thing is run through a GzipOutputFilter first, so the files are compressed).  I'm having trouble understanding how to best apply this to HDFS for map function consumption.  Our data collector writes 1 megabyte files, but I can combine them for map/reduce performance.  To avoid TOO much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd).

What I don't get is this: suppose we have a long file that spans multiple HDFS blocks.  I think I end up with problems similar to this guy:
    http://blog.rguha.net/?p=293

where one of my RitRecord objects is half in one HDFS block and half in another HDFS block.  If the mapper is assigning tasks to nodes along HDFS blocks then I'm going to end up with a problem.  It's not yet clear to me how to solve this.  I could make the problem LESS likely with bigger blocks (like the default 128MB) but even then, the problem doesn't completely go away (for me, a >128MB file is unlikely but not impossible).

--Chris