You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Wasim Bari <wa...@msn.com> on 2009/08/18 16:35:13 UTC

Customized InputFormat

 

Hi,

   I tried anotherway to implement the InputFileFormat which returns <Key,MultipleLines> as record to mapper.

 

I used this logic: Used a LineRecordReader to read file line by line and keep storing these lines in buffer. 

when i encouters an empty string , Set the buffer to value and return the result. Please see the attached code. 

 

 

But i get Java Heap error. apparently its because of buffer writing, but data is not so big and i am unable to find the solution. 

 

Please have a look and guide me. 

 

regards,

 

============================================

package initial;

import java.io.IOException;

import org.apache.hadoop.io.DataOutputBuffer;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.mapred.InputSplit;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.log4j.Logger;

@SuppressWarnings("deprecation")

public class PTextInputFormat1 extends TextInputFormat {

 

public void configure(JobConf jobConf) {

super.configure(jobConf);

}

public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf jobConf,

Reporter reporter) throws IOException {

return new PTextRecordReader((FileSplit) inputSplit, jobConf);

}

public static class PTextRecordReader implements RecordReader<LongWritable, Text> {

private static final Logger sLogger = Logger.getLogger(PTextRecordReader.class);



private DataOutputBuffer buffer = new DataOutputBuffer();

private JobConf job;

private FileSplit FSplit;

private long start;

private long end;

private int count;

org.apache.hadoop.mapred.LineRecordReader lineRecordReader;

public PTextRecordReader(FileSplit split, JobConf jobConf) throws IOException {



FSplit=split;

start = split.getStart();

job = jobConf;

lineRecordReader = new org.apache.hadoop.mapred.LineRecordReader(job,FSplit);

end = start + split.getLength();

}

public boolean next(LongWritable key, Text value) throws IOException { 



if (lineRecordReader.next(key, value)){

while (value.toString().length()!=0){

buffer.write(value.getBytes());

numberOfLines++;

}

key.set(count++);

value.set(buffer.getData(), 0, buffer.getLength());

buffer.reset();

return true;

}

buffer.reset();

return false;



}



public LongWritable createKey() {

return new LongWritable();

}

public Text createValue() {

return new Text();

}

public long getStart() {

return start;

}

public long getEnd() {

return end;

}

public long getPos() throws IOException {

return lineRecordReader.getPos();

}



public float getProgress() throws IOException {

return lineRecordReader.getProgress();

}

@Override

public void close() throws IOException {

lineRecordReader.close();



}

}

}

Re: Customized InputFormat

Posted by Kris Jirapinyo <kr...@biz360.com>.
Do you ever close your DataOutputBuffer?

-- Kris J.

On Tue, Aug 18, 2009 at 7:35 AM, Wasim Bari <wa...@msn.com> wrote:

>
> Hi,
>    I tried anotherway to implement the InputFileFormat which returns
> <Key,MultipleLines> as record to mapper.
>
> I used this logic: Used a LineRecordReader to read file line by line and
> keep storing these lines in buffer.
> when i encouters an empty string , Set the buffer to value and return the
> result. Please see the attached code.
>
>
> But i get Java Heap error. apparently its because of buffer writing, but
> data is not so big and i am unable to find the solution.
>
> Please have a look and guide me.
>
> regards,
>
> ============================================
> package initial;
> import java.io.IOException;
> import org.apache.hadoop.io.DataOutputBuffer;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.FileSplit;
> import org.apache.hadoop.mapred.InputSplit;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.RecordReader;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.mapred.TextInputFormat;
> import org.apache.log4j.Logger;
> @SuppressWarnings("deprecation")
> public class PTextInputFormat1 extends TextInputFormat {
>
> public void configure(JobConf jobConf) {
> super.configure(jobConf);
> }
> public RecordReader<LongWritable, Text> getRecordReader(InputSplit
> inputSplit, JobConf jobConf,
> Reporter reporter) throws IOException {
> return new PTextRecordReader((FileSplit) inputSplit, jobConf);
> }
> public static class PTextRecordReader implements RecordReader<LongWritable,
> Text> {
> private static final Logger sLogger =
> Logger.getLogger(PTextRecordReader.class);
>
> private DataOutputBuffer buffer = new DataOutputBuffer();
> private JobConf job;
> private FileSplit FSplit;
> private long start;
> private long end;
> private int count;
> org.apache.hadoop.mapred.LineRecordReader lineRecordReader;
> public PTextRecordReader(FileSplit split, JobConf jobConf) throws
> IOException {
>
> FSplit=split;
> start = split.getStart();
> job = jobConf;
> lineRecordReader = new
> org.apache.hadoop.mapred.LineRecordReader(job,FSplit);
> end = start + split.getLength();
> }
> public boolean next(LongWritable key, Text value) throws IOException {
>
> if (lineRecordReader.next(key, value)){
> while (value.toString().length()!=0){
> buffer.write(value.getBytes());
> numberOfLines++;
> }
> key.set(count++);
> value.set(buffer.getData(), 0, buffer.getLength());
> buffer.reset();
> return true;
> }
> buffer.reset();
> return false;
>
> }
>
> public LongWritable createKey() {
> return new LongWritable();
> }
> public Text createValue() {
> return new Text();
> }
> public long getStart() {
> return start;
> }
> public long getEnd() {
> return end;
> }
> public long getPos() throws IOException {
> return lineRecordReader.getPos();
> }
>
> public float getProgress() throws IOException {
> return lineRecordReader.getProgress();
> }
> @Override
> public void close() throws IOException {
> lineRecordReader.close();
>
> }
> }
> }
>