You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Joydeep Sen Sarma (JIRA)" <ji...@apache.org> on 2008/09/09 03:47:48 UTC

[jira] Issue Comment Edited: (HADOOP-4065) support for reading binary data from flat files

    [ https://issues.apache.org/jira/browse/HADOOP-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12629362#action_12629362 ] 

jsensarma edited comment on HADOOP-4065 at 9/8/08 6:47 PM:
-------------------------------------------------------------------

@Pete - i am still trying to understand the proposed interface. in my case - the data is not splitable (so maybe there's a different base class - UnsplittableFileInputFormat). The factory approach for getting a record reader sounds interesting - but on second thoughts - since the getRR() already takes in a Configuration object - we don't need a new interface for this i think. (Meaning that we could write a ConfigurableRecordReader that could look at the config and instantiate the right record reader and then redirect all RR api calls to the contained record reader.)

the main motivation i had for filing this bug was to extract out the common parts for dealing with compressed, non-splitable binary data into a base class (and most of this functionality is in the record reader) and make it easy to handle new kinds of binary data with minimal code. Binary files don't have keys and values - they just have rows of data. So one part is to supply some default key (like record number). The remaining part is to get the class of the row and the deserialization method. Appropriately - it would be nice to have a deserializerFactory that takes in a Configuration object (which is i think one of the key missing parts of hadoop-1986). that way - the deserializer can be configured by application - instead of hadoop maintaining a mapping from class -> Deserializer.

so my proposal would be something like this (admittedly - i am not being very ambitious here - just want to cover the issue mentioned in this jira):

{code}
/**
  * forced to make this class since maprunner tries to use same object for all next() calls.
  * This way we can swap out the actual 'row' object on each call to next()
  */
public class RowContainer {
   public Object row;
}

/**
 * Application can return right deserializer based on configuration
 */
public interface RowSource {
   public Deserializer<?> getDeserializer(Configuration conf) throws IOException;
   public Class<?> getClass();
}

/**
 * Reads a non-splitable binary flat file.
 */
public class RowSourceFileInputFormat extends FileInputFormat<LongWritable, RowContainer> {
  protected boolean isSplitable(FileSystem fs, Path file) { return false; }
  public RecordReader<LongWritable, RowContainer> getRecordReader(InputSplit genericSplit, JobConf job,
                                                                      Reporter reporter)
    throws IOException {
    reporter.setStatus(genericSplit.toString());
    return new RowSourceRecordReader(job, (FileSplit) genericSplit);
  }
}

/**
 * Reads one row at a time. The key is the row number and the actual row is returned inside the RowContainer
 */
public class RowSourceRecordReader implements RecordReader<LongWritable, RowContainer> {
    private long rnum;
    private final DataInput in;
    private final DecompressorStream dcin;
    private final FSDataInputStream fsin;
    private final long end;
    private final Deserializer deserializer;

    public RowSourceRecordReader(Configuration job,
                                FileSplit split) throws IOException {

      final Path file = split.getPath();
      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
      final CompressionCodec codec = compressionCodecs.getCodec(file);
      FileSystem fs = file.getFileSystem(job);
      fsin = fs.open(split.getPath());

      if(codec != null) {
        dcin = (DecompressorStream)codec.createInputStream(fsin);
        in = new DataInputStream(dcin);
      } else {
        dcin = null;
        in = fsin;
      }
      rnum = 0;
      end = split.getLength();

      deserializer=(ReflectionUtils.newInstance(job.getClass("mapred.input.rowsource", null, RowSource.class)).getDeserializer(job);
      deserializer.open(in);
    }

    public LongWritable createKey() {
      return new LongWritable();
    }

    public RowContainer createValue() {
       return new RowContainer();
    }

    public synchronized boolean next(LongWritable key, RowContainer value) throws IOException {
      if(dcin != null) {
        if (dcin.available() == 0) {
          return false;
        }
      } else {
        if(fsin.getPos() >= end) {
          return false;
        }
      }
      key.set(rnum++);
      Object row = deserializer.deserialize(value.row);
      value.row = row;
      return true;
    }

    public synchronized float getProgress() throws IOException {
      // this assumes no splitting                                                                                               
      if (end == 0) {
        return 0.0f;
      } else {
        // gives progress over uncompressed stream                                                                               
        return Math.min(1.0f, fsin.getPos()/(float)(end));


    public synchronized long getPos() throws IOException {
      // position over uncompressed stream. not sure what                                                                        
      // effect this has on stats about job                                                                                      
      return fsin.getPos();
    }

    public synchronized void close() throws IOException {
       // assuming that this closes the underlying streams
       deserializer.close();
    }
}
{code}

      was (Author: jsensarma):
    @Pete - i am still trying to understand the proposed interface. in my case - the data is not splitable (so maybe there's a different base class - UnsplittableFileInputFormat). The factory approach for getting a record reader sounds interesting - but on second thoughts - since the getRR() already takes in a Configuration object - we don't need a new interface for this i think. (Meaning that we could write a ConfigurableRecordReader that could look at the config and instantiate the right record reader and then redirect all RR api calls to the contained record reader.)

the main motivation i had for filing this bug was to extract out the common parts for dealing with compressed, non-splitable binary data into a base class (and most of this functionality is in the record reader) and make it easy to handle new kinds of binary data with minimal code. Binary files don't have keys and values - they just have rows of data. So one part is to supply some default key (like record number). The remaining part is to get the class of the row and the deserialization method. Appropriately - it would be nice to have a deserializerFactory that takes in a Configuration object (which is i think one of the key missing parts of hadoop-1986). that way - the deserializer can be configured by application - instead of hadoop maintaining a mapping from class -> Deserializer.

so my proposal would be something like this (admittedly - i am not being very ambitious here - just want to cover the issue mentioned in this jira):

/**
  * forced to make this class since maprunner tries to use same object for all next() calls.
  * This way we can swap out the actual 'row' object on each call to next()
  */
public class RowContainer {
   public Object row;
}

/**
 * Application can return right deserializer based on configuration
 */
public interface RowSource {
   public Deserializer<?> getDeserializer(Configuration conf) throws IOException;
   public Class<?> getClass();
}

/**
 * Reads a non-splitable binary flat file.
 */
public class RowSourceFileInputFormat extends FileInputFormat<LongWritable, RowContainer> {
  protected boolean isSplitable(FileSystem fs, Path file) { return false; }
  public RecordReader<LongWritable, RowContainer> getRecordReader(InputSplit genericSplit, JobConf job,
                                                                      Reporter reporter)
    throws IOException {
    reporter.setStatus(genericSplit.toString());
    return new RowSourceRecordReader(job, (FileSplit) genericSplit);
  }
}

/**
 * Reads one row at a time. The key is the row number and the actual row is returned inside the RowContainer
 */
public class RowSourceRecordReader implements RecordReader<LongWritable, RowContainer> {
    private long rnum;
    private final DataInput in;
    private final DecompressorStream dcin;
    private final FSDataInputStream fsin;
    private final long end;
    private final Deserializer deserializer;

    public RowSourceRecordReader(Configuration job,
                                FileSplit split) throws IOException {

      final Path file = split.getPath();
      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
      final CompressionCodec codec = compressionCodecs.getCodec(file);
      FileSystem fs = file.getFileSystem(job);
      fsin = fs.open(split.getPath());

      if(codec != null) {
        dcin = (DecompressorStream)codec.createInputStream(fsin);
        in = new DataInputStream(dcin);
      } else {
        dcin = null;
        in = fsin;
      }
      rnum = 0;
      end = split.getLength();

      deserializer=(ReflectionUtils.newInstance(job.getClass("mapred.input.rowsource", null, RowSource.class)).getDeserializer(job);
      deserializer.open(in);
    }

    public LongWritable createKey() {
      return new LongWritable();
    }

    public RowContainer createValue() {
       return new RowContainer();
    }

    public synchronized boolean next(LongWritable key, RowContainer value) throws IOException {
      if(dcin != null) {
        if (dcin.available() == 0) {
          return false;
        }
      } else {
        if(fsin.getPos() >= end) {
          return false;
        }
      }
      key.set(rnum++);
      Object row = deserializer.deserialize(value.row);
      value.row = row;
      return true;
    }

    public synchronized float getProgress() throws IOException {
      // this assumes no splitting                                                                                               
      if (end == 0) {
        return 0.0f;
      } else {
        // gives progress over uncompressed stream                                                                               
        return Math.min(1.0f, fsin.getPos()/(float)(end));


    public synchronized long getPos() throws IOException {
      // position over uncompressed stream. not sure what                                                                        
      // effect this has on stats about job                                                                                      
      return fsin.getPos();
    }

    public synchronized void close() throws IOException {
       // assuming that this closes the underlying streams
       deserializer.close();
    }
}

  
> support for reading binary data from flat files
> -----------------------------------------------
>
>                 Key: HADOOP-4065
>                 URL: https://issues.apache.org/jira/browse/HADOOP-4065
>             Project: Hadoop Core
>          Issue Type: Bug
>          Components: mapred
>            Reporter: Joydeep Sen Sarma
>         Attachments: HADOOP-4065.0.txt, ThriftFlatFile.java
>
>
> like textinputformat - looking for a concrete implementation to read binary records from a flat file (that may be compressed).
> it's assumed that hadoop can't split such a file. so the inputformat can set splittable to false.
> tricky aspects are:
> - how to know what class the file contains (has to be in a configuration somewhere).
> - how to determine EOF (would be nice if hadoop can determine EOF and not have the deserializer throw an exception  (which is hard to distinguish from a exception due to corruptions?)). this is easy for non-compressed streams - for compressed streams - DecompressorStream has a useful looking getAvailable() call - except the class is marked package private.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.