You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Matt Pouttu-Clarke <Ma...@icrossing.com> on 2011/09/22 20:23:25 UTC

Can we get a Hadoop record reader with GenericDatumReader support?

Hi All,

On our project we use the schema evolution features of Avro, and we also
need to select data without knowing the Schema(s) a priori.  In other words
we have a large number of Avro files with an evolving schema, and we may
project with any historical schema, or not project any schema at all.  In
the case of not being able to specify a specific schema, we could not find a
RecordReader which supports GenericDatumReader.  Since we are using
Cascading.Avro to hide the Avro plumbing, we created a patch to
Cascading.Avro, providing a generic record reader.   This is obviously a
temporary fix as we would prefer that this support existed within Avro
itself.  

Can anyone suggest a more elegant solution, source code below...

Cheers,
Matt

    public class CascadingAvroRecordReader<T>
        implements RecordReader<AvroWrapper<T>, NullWritable>
    {

        private FsInput in;
        private DataFileReader<T> reader;
        private long start;
        private long end;

        public CascadingAvroRecordReader(JobConf job, FileSplit split)
        throws IOException
        {
            this.in = new FsInput(split.getPath(), job);
            this.reader = new DataFileReader<T>(in, new
GenericDatumReader<T>());
            reader.sync(split.getStart());                    // sync to
start
            this.start = in.tell();
            this.end = split.getStart() + split.getLength();
        }

        public AvroWrapper<T> createKey() {
            return new AvroWrapper<T>(null);
        }
        public NullWritable createValue() { return NullWritable.get(); }
        public boolean next(AvroWrapper<T> wrapper, NullWritable ignore)
        throws IOException {
            if (!reader.hasNext() || reader.pastSync(end))
                return false;
            wrapper.datum(reader.next(wrapper.datum()));
            return true;
        }
        public float getProgress() throws IOException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.tell() - start) / (float)(end -
start));
            }
        }
        public long getPos() throws IOException {
            return in.tell();
        }
        public void close() throws IOException { reader.close(); }

    }

iCrossing Privileged and Confidential Information
This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information of iCrossing. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.



Re: Can we get a Hadoop record reader with GenericDatumReader support?

Posted by Doug Cutting <cu...@apache.org>.
On 09/22/2011 11:23 AM, Matt Pouttu-Clarke wrote:
> On our project we use the schema evolution features of Avro, and we also
> need to select data without knowing the Schema(s) a priori.  In other
> words we have a large number of Avro files with an evolving schema, and
> we may project with any historical schema, or not project any schema at
> all.  In the case of not being able to specify a specific schema, we
> could not find a RecordReader which supports GenericDatumReader.

SpecificDatumReader extends GenericDatumReader.  If no class is defined
corresponding to a record then SpecificDatumReader will produce a
GenericRecord.  The only case where this might not be what you want is
if you have a specific, generated class loaded but wish to force the use
of a GenericRecord, e.g., if the schema of the data written includes
fields not in the class that's loaded and you wish to read those fields.
 In that case perhaps we should add a feature permitting one to force
the use of GenericDatumReader in AvroInputFormat?  If so, please file an
issue in Jira.

Doug