You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Sean Arietta <sa...@virginia.edu> on 2008/06/30 18:22:29 UTC

RecordReader Functionality

Hello all,
I am having a problem writing my own RecordReader. The basic setup I have is
a large byte array that needs to be diced up into key value pairs such that
the key is the index into the array and the value is a byte array itself
(albeit much smaller). Here is the code that I currently have written to do
just this:


/* This method is just the constructor for my new RecordReader
public TrainingRecordReader(Configuration job, FileSplit split) throws
IOException 
	{
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);
		
		// open the file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		in = new TrainingReader(fileIn, job);
		this.pos = start;
	}

// This returns the key, value pair I was talking about
public synchronized boolean next(LongWritable key, BytesWritable value)
throws IOException 
	{
		if (pos >= end)
			return false;
		
		key.set(pos);           // key is position
		int newSize = in.readVector(value);
		if (newSize > 0) 
		{
			pos += newSize;
			return true;
		}
		return false;
	}

// This extracts that smaller byte array from the large input file
public int readVector(BytesWritable value) throws IOException 
		{
			int numBytes = in.read(buffer);
			value.set(buffer, 0, numBytes);
			return numBytes;
		}

So all of this worked just fine when I set conf.set("mapred.job.tracker",
"local"), but now that I am attempting to test in a fake distributed setting
(aka still one node, but I haven't set the above config param), I do not get
what I want. Instead of getting unique key value pairs, I get repeated key
value pairs based on the number of map tasks I have set. So, say that my
large file contained 49 entries, I would want a unique key value pair for
each of those, but if I set my numMapTasks to 7, I get 7 unique ones that
repeat every 7 key value pairs.

So it seems that each MapTask which ultimately calls my
TrainingReader.next() method from above is somehow pointing to the same
FileSplit. I know that in the LineRecordReader in the source there is some
small little routine that skips the first line of the data if you aren't at
the beginning???? Is that related? Why isn't it the case that
split.getStart() isn't returning the absolute pointer to the start of the
split? So many questions I don't know the answer to, haha.

I would appreciate anyone's help in resolving this issue. Thanks very much!

Cheers,
Sean M. Arietta
-- 
View this message in context: http://www.nabble.com/RecordReader-Functionality-tp18199187p18199187.html
Sent from the Hadoop core-user mailing list archive at Nabble.com.


RE: RecordReader Functionality

Posted by Runping Qi <ru...@yahoo-inc.com>.
Your record reader must be able to find the beginning of the next record
beyond the start position of a given split. Your file format must enable
your record reader to detect the beginning of the next record beyond the
start pos of a split. It seems to me that is not possible based on the
info I saw so far.
Why not just use SequenceFile instead?

Runping


> -----Original Message-----
> From: Sean Arietta [mailto:sarietta@virginia.edu]
> Sent: Monday, June 30, 2008 10:29 AM
> To: core-user@hadoop.apache.org
> Subject: Re: RecordReader Functionality
> 
> 
> I thought that the InStream buffer (called 'in' in this case) would
> maintain
> the stream position based on how many bytes I had 'read' via
in.read().
> Maybe this is not the case...
> 
> Would it then be proper to call:
> 
> in.seek(pos);
> 
> I believe I tried this at one point and I got an error. I will try
again
> to
> be sure though. Thanks for your reply!
> 
> Cheers,
> Sean
> 
> 
> Jorgen Johnson wrote:
> >
> > Hi Sean,
> >
> > Perhaps I'm missing something, but it doesn't appear to me that
you're
> > actually seeking to the filesplit start position in your
constructor...
> >
> > This would explain why all the mappers are getting the same records.
> >
> > -jorgenj
> >
> > On Mon, Jun 30, 2008 at 9:22 AM, Sean Arietta
<sa...@virginia.edu>
> > wrote:
> >
> >>
> >> Hello all,
> >> I am having a problem writing my own RecordReader. The basic setup
I
> have
> >> is
> >> a large byte array that needs to be diced up into key value pairs
such
> >> that
> >> the key is the index into the array and the value is a byte array
> itself
> >> (albeit much smaller). Here is the code that I currently have
written
> to
> >> do
> >> just this:
> >>
> >>
> >> /* This method is just the constructor for my new RecordReader
> >> public TrainingRecordReader(Configuration job, FileSplit split)
throws
> >> IOException
> >>        {
> >>                start = split.getStart();
> >>                end = start + split.getLength();
> >>                final Path file = split.getPath();
> >>                compressionCodecs = new
CompressionCodecFactory(job);
> >>                final CompressionCodec codec =
> >> compressionCodecs.getCodec(file);
> >>
> >>                // open the file and seek to the start of the split
> >>                FileSystem fs = file.getFileSystem(job);
> >>                FSDataInputStream fileIn = fs.open(split.getPath());
> >>                in = new TrainingReader(fileIn, job);
> >>                this.pos = start;
> >>        }
> >>
> >> // This returns the key, value pair I was talking about
> >> public synchronized boolean next(LongWritable key, BytesWritable
value)
> >> throws IOException
> >>        {
> >>                if (pos >= end)
> >>                        return false;
> >>
> >>                key.set(pos);           // key is position
> >>                int newSize = in.readVector(value);
> >>                if (newSize > 0)
> >>                {
> >>                        pos += newSize;
> >>                        return true;
> >>                }
> >>                return false;
> >>        }
> >>
> >> // This extracts that smaller byte array from the large input file
> >> public int readVector(BytesWritable value) throws IOException
> >>                {
> >>                        int numBytes = in.read(buffer);
> >>                        value.set(buffer, 0, numBytes);
> >>                        return numBytes;
> >>                }
> >>
> >> So all of this worked just fine when I set
> conf.set("mapred.job.tracker",
> >> "local"), but now that I am attempting to test in a fake
distributed
> >> setting
> >> (aka still one node, but I haven't set the above config param), I
do
> not
> >> get
> >> what I want. Instead of getting unique key value pairs, I get
repeated
> >> key
> >> value pairs based on the number of map tasks I have set. So, say
that
> my
> >> large file contained 49 entries, I would want a unique key value
pair
> for
> >> each of those, but if I set my numMapTasks to 7, I get 7 unique
ones
> that
> >> repeat every 7 key value pairs.
> >>
> >> So it seems that each MapTask which ultimately calls my
> >> TrainingReader.next() method from above is somehow pointing to the
same
> >> FileSplit. I know that in the LineRecordReader in the source there
is
> >> some
> >> small little routine that skips the first line of the data if you
> aren't
> >> at
> >> the beginning???? Is that related? Why isn't it the case that
> >> split.getStart() isn't returning the absolute pointer to the start
of
> the
> >> split? So many questions I don't know the answer to, haha.
> >>
> >> I would appreciate anyone's help in resolving this issue. Thanks
very
> >> much!
> >>
> >> Cheers,
> >> Sean M. Arietta
> >> --
> >> View this message in context:
> >> http://www.nabble.com/RecordReader-Functionality-
> tp18199187p18199187.html
> >> Sent from the Hadoop core-user mailing list archive at Nabble.com.
> >>
> >>
> >
> >
> > --
> > "Liberties are not given, they are taken."
> > - Aldous Huxley
> >
> >
> 
> --
> View this message in context: http://www.nabble.com/RecordReader-
> Functionality-tp18199187p18200404.html
> Sent from the Hadoop core-user mailing list archive at Nabble.com.


Re: RecordReader Functionality

Posted by Sean Arietta <sa...@virginia.edu>.
I thought that the InStream buffer (called 'in' in this case) would maintain
the stream position based on how many bytes I had 'read' via in.read().
Maybe this is not the case...

Would it then be proper to call:

in.seek(pos);

I believe I tried this at one point and I got an error. I will try again to
be sure though. Thanks for your reply!

Cheers,
Sean


Jorgen Johnson wrote:
> 
> Hi Sean,
> 
> Perhaps I'm missing something, but it doesn't appear to me that you're
> actually seeking to the filesplit start position in your constructor...
> 
> This would explain why all the mappers are getting the same records.
> 
> -jorgenj
> 
> On Mon, Jun 30, 2008 at 9:22 AM, Sean Arietta <sa...@virginia.edu>
> wrote:
> 
>>
>> Hello all,
>> I am having a problem writing my own RecordReader. The basic setup I have
>> is
>> a large byte array that needs to be diced up into key value pairs such
>> that
>> the key is the index into the array and the value is a byte array itself
>> (albeit much smaller). Here is the code that I currently have written to
>> do
>> just this:
>>
>>
>> /* This method is just the constructor for my new RecordReader
>> public TrainingRecordReader(Configuration job, FileSplit split) throws
>> IOException
>>        {
>>                start = split.getStart();
>>                end = start + split.getLength();
>>                final Path file = split.getPath();
>>                compressionCodecs = new CompressionCodecFactory(job);
>>                final CompressionCodec codec =
>> compressionCodecs.getCodec(file);
>>
>>                // open the file and seek to the start of the split
>>                FileSystem fs = file.getFileSystem(job);
>>                FSDataInputStream fileIn = fs.open(split.getPath());
>>                in = new TrainingReader(fileIn, job);
>>                this.pos = start;
>>        }
>>
>> // This returns the key, value pair I was talking about
>> public synchronized boolean next(LongWritable key, BytesWritable value)
>> throws IOException
>>        {
>>                if (pos >= end)
>>                        return false;
>>
>>                key.set(pos);           // key is position
>>                int newSize = in.readVector(value);
>>                if (newSize > 0)
>>                {
>>                        pos += newSize;
>>                        return true;
>>                }
>>                return false;
>>        }
>>
>> // This extracts that smaller byte array from the large input file
>> public int readVector(BytesWritable value) throws IOException
>>                {
>>                        int numBytes = in.read(buffer);
>>                        value.set(buffer, 0, numBytes);
>>                        return numBytes;
>>                }
>>
>> So all of this worked just fine when I set conf.set("mapred.job.tracker",
>> "local"), but now that I am attempting to test in a fake distributed
>> setting
>> (aka still one node, but I haven't set the above config param), I do not
>> get
>> what I want. Instead of getting unique key value pairs, I get repeated
>> key
>> value pairs based on the number of map tasks I have set. So, say that my
>> large file contained 49 entries, I would want a unique key value pair for
>> each of those, but if I set my numMapTasks to 7, I get 7 unique ones that
>> repeat every 7 key value pairs.
>>
>> So it seems that each MapTask which ultimately calls my
>> TrainingReader.next() method from above is somehow pointing to the same
>> FileSplit. I know that in the LineRecordReader in the source there is
>> some
>> small little routine that skips the first line of the data if you aren't
>> at
>> the beginning???? Is that related? Why isn't it the case that
>> split.getStart() isn't returning the absolute pointer to the start of the
>> split? So many questions I don't know the answer to, haha.
>>
>> I would appreciate anyone's help in resolving this issue. Thanks very
>> much!
>>
>> Cheers,
>> Sean M. Arietta
>> --
>> View this message in context:
>> http://www.nabble.com/RecordReader-Functionality-tp18199187p18199187.html
>> Sent from the Hadoop core-user mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> "Liberties are not given, they are taken."
> - Aldous Huxley
> 
> 

-- 
View this message in context: http://www.nabble.com/RecordReader-Functionality-tp18199187p18200404.html
Sent from the Hadoop core-user mailing list archive at Nabble.com.


Re: RecordReader Functionality

Posted by Jorgen Johnson <jo...@gmail.com>.
Hi Sean,

Perhaps I'm missing something, but it doesn't appear to me that you're
actually seeking to the filesplit start position in your constructor...

This would explain why all the mappers are getting the same records.

-jorgenj

On Mon, Jun 30, 2008 at 9:22 AM, Sean Arietta <sa...@virginia.edu> wrote:

>
> Hello all,
> I am having a problem writing my own RecordReader. The basic setup I have
> is
> a large byte array that needs to be diced up into key value pairs such that
> the key is the index into the array and the value is a byte array itself
> (albeit much smaller). Here is the code that I currently have written to do
> just this:
>
>
> /* This method is just the constructor for my new RecordReader
> public TrainingRecordReader(Configuration job, FileSplit split) throws
> IOException
>        {
>                start = split.getStart();
>                end = start + split.getLength();
>                final Path file = split.getPath();
>                compressionCodecs = new CompressionCodecFactory(job);
>                final CompressionCodec codec =
> compressionCodecs.getCodec(file);
>
>                // open the file and seek to the start of the split
>                FileSystem fs = file.getFileSystem(job);
>                FSDataInputStream fileIn = fs.open(split.getPath());
>                in = new TrainingReader(fileIn, job);
>                this.pos = start;
>        }
>
> // This returns the key, value pair I was talking about
> public synchronized boolean next(LongWritable key, BytesWritable value)
> throws IOException
>        {
>                if (pos >= end)
>                        return false;
>
>                key.set(pos);           // key is position
>                int newSize = in.readVector(value);
>                if (newSize > 0)
>                {
>                        pos += newSize;
>                        return true;
>                }
>                return false;
>        }
>
> // This extracts that smaller byte array from the large input file
> public int readVector(BytesWritable value) throws IOException
>                {
>                        int numBytes = in.read(buffer);
>                        value.set(buffer, 0, numBytes);
>                        return numBytes;
>                }
>
> So all of this worked just fine when I set conf.set("mapred.job.tracker",
> "local"), but now that I am attempting to test in a fake distributed
> setting
> (aka still one node, but I haven't set the above config param), I do not
> get
> what I want. Instead of getting unique key value pairs, I get repeated key
> value pairs based on the number of map tasks I have set. So, say that my
> large file contained 49 entries, I would want a unique key value pair for
> each of those, but if I set my numMapTasks to 7, I get 7 unique ones that
> repeat every 7 key value pairs.
>
> So it seems that each MapTask which ultimately calls my
> TrainingReader.next() method from above is somehow pointing to the same
> FileSplit. I know that in the LineRecordReader in the source there is some
> small little routine that skips the first line of the data if you aren't at
> the beginning???? Is that related? Why isn't it the case that
> split.getStart() isn't returning the absolute pointer to the start of the
> split? So many questions I don't know the answer to, haha.
>
> I would appreciate anyone's help in resolving this issue. Thanks very much!
>
> Cheers,
> Sean M. Arietta
> --
> View this message in context:
> http://www.nabble.com/RecordReader-Functionality-tp18199187p18199187.html
> Sent from the Hadoop core-user mailing list archive at Nabble.com.
>
>


-- 
"Liberties are not given, they are taken."
- Aldous Huxley