You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Wasim Bari <wa...@msn.com> on 2009/08/18 16:35:13 UTC
Customized InputFormat
Hi,
I tried anotherway to implement the InputFileFormat which returns <Key,MultipleLines> as record to mapper.
I used this logic: Used a LineRecordReader to read file line by line and keep storing these lines in buffer.
when i encouters an empty string , Set the buffer to value and return the result. Please see the attached code.
But i get Java Heap error. apparently its because of buffer writing, but data is not so big and i am unable to find the solution.
Please have a look and guide me.
regards,
============================================
package initial;
import java.io.IOException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.log4j.Logger;
@SuppressWarnings("deprecation")
public class PTextInputFormat1 extends TextInputFormat {
public void configure(JobConf jobConf) {
super.configure(jobConf);
}
public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
return new PTextRecordReader((FileSplit) inputSplit, jobConf);
}
public static class PTextRecordReader implements RecordReader<LongWritable, Text> {
private static final Logger sLogger = Logger.getLogger(PTextRecordReader.class);
private DataOutputBuffer buffer = new DataOutputBuffer();
private JobConf job;
private FileSplit FSplit;
private long start;
private long end;
private int count;
org.apache.hadoop.mapred.LineRecordReader lineRecordReader;
public PTextRecordReader(FileSplit split, JobConf jobConf) throws IOException {
FSplit=split;
start = split.getStart();
job = jobConf;
lineRecordReader = new org.apache.hadoop.mapred.LineRecordReader(job,FSplit);
end = start + split.getLength();
}
public boolean next(LongWritable key, Text value) throws IOException {
if (lineRecordReader.next(key, value)){
while (value.toString().length()!=0){
buffer.write(value.getBytes());
numberOfLines++;
}
key.set(count++);
value.set(buffer.getData(), 0, buffer.getLength());
buffer.reset();
return true;
}
buffer.reset();
return false;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
public long getPos() throws IOException {
return lineRecordReader.getPos();
}
public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
}
Re: Customized InputFormat
Posted by Kris Jirapinyo <kr...@biz360.com>.
Do you ever close your DataOutputBuffer?
-- Kris J.
On Tue, Aug 18, 2009 at 7:35 AM, Wasim Bari <wa...@msn.com> wrote:
>
> Hi,
> I tried anotherway to implement the InputFileFormat which returns
> <Key,MultipleLines> as record to mapper.
>
> I used this logic: Used a LineRecordReader to read file line by line and
> keep storing these lines in buffer.
> when i encouters an empty string , Set the buffer to value and return the
> result. Please see the attached code.
>
>
> But i get Java Heap error. apparently its because of buffer writing, but
> data is not so big and i am unable to find the solution.
>
> Please have a look and guide me.
>
> regards,
>
> ============================================
> package initial;
> import java.io.IOException;
> import org.apache.hadoop.io.DataOutputBuffer;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.FileSplit;
> import org.apache.hadoop.mapred.InputSplit;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.RecordReader;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.mapred.TextInputFormat;
> import org.apache.log4j.Logger;
> @SuppressWarnings("deprecation")
> public class PTextInputFormat1 extends TextInputFormat {
>
> public void configure(JobConf jobConf) {
> super.configure(jobConf);
> }
> public RecordReader<LongWritable, Text> getRecordReader(InputSplit
> inputSplit, JobConf jobConf,
> Reporter reporter) throws IOException {
> return new PTextRecordReader((FileSplit) inputSplit, jobConf);
> }
> public static class PTextRecordReader implements RecordReader<LongWritable,
> Text> {
> private static final Logger sLogger =
> Logger.getLogger(PTextRecordReader.class);
>
> private DataOutputBuffer buffer = new DataOutputBuffer();
> private JobConf job;
> private FileSplit FSplit;
> private long start;
> private long end;
> private int count;
> org.apache.hadoop.mapred.LineRecordReader lineRecordReader;
> public PTextRecordReader(FileSplit split, JobConf jobConf) throws
> IOException {
>
> FSplit=split;
> start = split.getStart();
> job = jobConf;
> lineRecordReader = new
> org.apache.hadoop.mapred.LineRecordReader(job,FSplit);
> end = start + split.getLength();
> }
> public boolean next(LongWritable key, Text value) throws IOException {
>
> if (lineRecordReader.next(key, value)){
> while (value.toString().length()!=0){
> buffer.write(value.getBytes());
> numberOfLines++;
> }
> key.set(count++);
> value.set(buffer.getData(), 0, buffer.getLength());
> buffer.reset();
> return true;
> }
> buffer.reset();
> return false;
>
> }
>
> public LongWritable createKey() {
> return new LongWritable();
> }
> public Text createValue() {
> return new Text();
> }
> public long getStart() {
> return start;
> }
> public long getEnd() {
> return end;
> }
> public long getPos() throws IOException {
> return lineRecordReader.getPos();
> }
>
> public float getProgress() throws IOException {
> return lineRecordReader.getProgress();
> }
> @Override
> public void close() throws IOException {
> lineRecordReader.close();
>
> }
> }
> }
>