You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Mapred Learn <ma...@gmail.com> on 2011/05/20 02:44:43 UTC

Custom input format query

Hi,
I have implemented a custom record reader to read fixed length records.
Pseudo code is as:

 class CRecordReader extends RecordReader<Text, BytesWritable> {
 private FileSplit fileSplit;
 private Configuration conf;
 private int recordSize;
 private int fileSize;
 private int recordNum = 0;
 private FSDataInputStream fileIn = null;
 private Text key = null;
 private BytesWritable value = null;
 private static int offset = 0;
 private String layoutStr ="";

 public void initialize(InputSplit inputSplit,
   TaskAttemptContext taskAttemptContext) throws IOException {
  fileSplit = (FileSplit) inputSplit;
   final Path file = fileSplit.getPath();
  FileSystem fs = file.getFileSystem(conf);
        fileIn = fs.open(fileSplit.getPath());
        int metaInfoLen = fileIn.readInt();
        byte[] metaStr = new byte[metaInfoLen];
        int bytesRead = fileIn.read(metaStr, 0, metaInfoLen);
        if (bytesRead <= 0) {
         System.out.println("error bytes");
        }
        String metaInfo =  new String(metaStr, "US-ASCII");
        String[] fileMetadata = metaInfo.split("-");
  String fileLenStr = fileMetadata[1];
  String recordLenStr = fileMetadata[2];
  layoutStr = fileMetadata[3];
     recordSize = Integer.parseInt(recordLenStr);
  fileSize = Integer.parseInt(fileLenStr);
 }

public boolean nextKeyValue() throws IOException, InterruptedException {
  // TODO Auto-generated method stub
  if (key ==null) {
   key =  new Text();
  }

  key.set(recordNum + "-" + layoutStr);

  if (value == null) {
        value = new BytesWritable();
  }

  int bytesRead = 0;
  while (offset < fileSize ) {
   byte[] record = new byte[recordSize];

   bytesRead = fileIn.read(record, 0, recordSize);

   if ((bytesRead == 0) || (bytesRead < recordSize)) {
    key = null;
    value = null;
    return false;
   }
   offset += bytesRead;
   value.set(record, 0, recordSize);
   }
  return true;
 }
}

The problem is that my input file has only 2 records but mapper keeps on
iteraing over the first record again and again and never ends.

Obviously I am doing it wrong.. Could somebody help what am I doing wrong ?

Thanks in advance
- Jimmy

Re: Custom input format query

Posted by Mapred Learn <ma...@gmail.com>.
Hi Steve,
I did what you said and removed while loop and replaced with:
if (offset < fileSize) {
.
}

but in this case, mapper is not processing complete file. It stops somewhere
in the middle and processes only  more than half the records in input split.
For eg. fileSize is 116500004 and last value of offset that i see in log is
67108664. This happens in all the mappers.




On Thu, May 19, 2011 at 5:57 PM, Steve Lewis <lo...@gmail.com> wrote:

> 1) add
>
>         @Override
>         public Text getCurrentKey() {
>             return key;
>         }
>
>         @Override
>         public Text getCurrentValue() {
>             return value;
>         }
>
> 2) do not make offset static
> 3) nextKeyValue should read a single record not
>     while (offset < fileSize ) { ...
>
> On Thu, May 19, 2011 at 5:44 PM, Mapred Learn <ma...@gmail.com>wrote:
>
>> Hi,
>> I have implemented a custom record reader to read fixed length records.
>> Pseudo code is as:
>>
>>  class CRecordReader extends RecordReader<Text, BytesWritable> {
>>  private FileSplit fileSplit;
>>  private Configuration conf;
>>  private int recordSize;
>>  private int fileSize;
>>  private int recordNum = 0;
>>  private FSDataInputStream fileIn = null;
>>  private Text key = null;
>>  private BytesWritable value = null;
>>  private static int offset = 0;
>>  private String layoutStr ="";
>>
>>  public void initialize(InputSplit inputSplit,
>>    TaskAttemptContext taskAttemptContext) throws IOException {
>>   fileSplit = (FileSplit) inputSplit;
>>    final Path file = fileSplit.getPath();
>>   FileSystem fs = file.getFileSystem(conf);
>>         fileIn = fs.open(fileSplit.getPath());
>>         int metaInfoLen = fileIn.readInt();
>>         byte[] metaStr = new byte[metaInfoLen];
>>         int bytesRead = fileIn.read(metaStr, 0, metaInfoLen);
>>         if (bytesRead <= 0) {
>>          System.out.println("error bytes");
>>         }
>>         String metaInfo =  new String(metaStr, "US-ASCII");
>>         String[] fileMetadata = metaInfo.split("-");
>>   String fileLenStr = fileMetadata[1];
>>   String recordLenStr = fileMetadata[2];
>>   layoutStr = fileMetadata[3];
>>      recordSize = Integer.parseInt(recordLenStr);
>>   fileSize = Integer.parseInt(fileLenStr);
>>  }
>>
>> public boolean nextKeyValue() throws IOException, InterruptedException {
>>   // TODO Auto-generated method stub
>>   if (key ==null) {
>>    key =  new Text();
>>   }
>>
>>   key.set(recordNum + "-" + layoutStr);
>>
>>   if (value == null) {
>>         value = new BytesWritable();
>>   }
>>
>>   int bytesRead = 0;
>>   while (offset < fileSize ) {
>>    byte[] record = new byte[recordSize];
>>
>>    bytesRead = fileIn.read(record, 0, recordSize);
>>
>>    if ((bytesRead == 0) || (bytesRead < recordSize)) {
>>     key = null;
>>     value = null;
>>     return false;
>>    }
>>    offset += bytesRead;
>>    value.set(record, 0, recordSize);
>>    }
>>   return true;
>>  }
>> }
>>
>> The problem is that my input file has only 2 records but mapper keeps on
>> iteraing over the first record again and again and never ends.
>>
>> Obviously I am doing it wrong.. Could somebody help what am I doing wrong
>> ?
>>
>> Thanks in advance
>> - Jimmy
>>
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>
>

Re: Custom input format query

Posted by Steve Lewis <lo...@gmail.com>.
1) add

        @Override
        public Text getCurrentKey() {
            return key;
        }

        @Override
        public Text getCurrentValue() {
            return value;
        }

2) do not make offset static
3) nextKeyValue should read a single record not
    while (offset < fileSize ) { ...

On Thu, May 19, 2011 at 5:44 PM, Mapred Learn <ma...@gmail.com>wrote:

> Hi,
> I have implemented a custom record reader to read fixed length records.
> Pseudo code is as:
>
>  class CRecordReader extends RecordReader<Text, BytesWritable> {
>  private FileSplit fileSplit;
>  private Configuration conf;
>  private int recordSize;
>  private int fileSize;
>  private int recordNum = 0;
>  private FSDataInputStream fileIn = null;
>  private Text key = null;
>  private BytesWritable value = null;
>  private static int offset = 0;
>  private String layoutStr ="";
>
>  public void initialize(InputSplit inputSplit,
>    TaskAttemptContext taskAttemptContext) throws IOException {
>   fileSplit = (FileSplit) inputSplit;
>    final Path file = fileSplit.getPath();
>   FileSystem fs = file.getFileSystem(conf);
>         fileIn = fs.open(fileSplit.getPath());
>         int metaInfoLen = fileIn.readInt();
>         byte[] metaStr = new byte[metaInfoLen];
>         int bytesRead = fileIn.read(metaStr, 0, metaInfoLen);
>         if (bytesRead <= 0) {
>          System.out.println("error bytes");
>         }
>         String metaInfo =  new String(metaStr, "US-ASCII");
>         String[] fileMetadata = metaInfo.split("-");
>   String fileLenStr = fileMetadata[1];
>   String recordLenStr = fileMetadata[2];
>   layoutStr = fileMetadata[3];
>      recordSize = Integer.parseInt(recordLenStr);
>   fileSize = Integer.parseInt(fileLenStr);
>  }
>
> public boolean nextKeyValue() throws IOException, InterruptedException {
>   // TODO Auto-generated method stub
>   if (key ==null) {
>    key =  new Text();
>   }
>
>   key.set(recordNum + "-" + layoutStr);
>
>   if (value == null) {
>         value = new BytesWritable();
>   }
>
>   int bytesRead = 0;
>   while (offset < fileSize ) {
>    byte[] record = new byte[recordSize];
>
>    bytesRead = fileIn.read(record, 0, recordSize);
>
>    if ((bytesRead == 0) || (bytesRead < recordSize)) {
>     key = null;
>     value = null;
>     return false;
>    }
>    offset += bytesRead;
>    value.set(record, 0, recordSize);
>    }
>   return true;
>  }
> }
>
> The problem is that my input file has only 2 records but mapper keeps on
> iteraing over the first record again and again and never ends.
>
> Obviously I am doing it wrong.. Could somebody help what am I doing wrong ?
>
> Thanks in advance
> - Jimmy
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com