You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Denitsa Tsolova <de...@gmail.com> on 2010/11/19 18:16:34 UTC

How to develop custom Avro InputFormat and Deserializer?

Hi all,
We have a large amount of data in HDFS stored in Avro format. We don't want
to convert it to Hive supported format.
That is why we have developed a custom InputFormat and Deserializer. Our
custom InputFormat does not support file split, because the Avro schema
(which describes the structure of the records stored in the file) is placed
at the end of the file. Here is how our AvroInputFormat is implemented:

public class AvroInputFormat extends FileInputFormat<LongWritable, Text> {

    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit
inputSplit,
            JobConf job, Reporter reporter) throws IOException {

        FileSplit split = (FileSplit) inputSplit;
        Path file = split.getPath();
        FileSystem fs = file.getFileSystem(job);

        return new AvroRecordReader(file, fs);
    }

    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
    };

    class AvroRecordReader implements RecordReader<LongWritable, Text> {

        private final Path file;
        private final FileSystem fs;

        public AvroRecordReader(Path file, FileSystem fs) {
            this.file = file;
            this.fs = fs;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public LongWritable createKey() {
            LongWritable longWritable = new LongWritable(1);
            return longWritable;
        }

        @Override
        public Text createValue() {
            return new Text();
        }

        @Override
        public long getPos() throws IOException {
            return 0;
        }

        @Override
        public float getProgress() throws IOException {
            return 0;
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException
{

            FSDataInputStream fileIn = null;
            try {
                fileIn = fs.open(file);

                byte[] bytes = new byte[1024];
                int readBytes = 0;

                while ((readBytes = fileIn.read(bytes)) > 0) {
                    value.append(bytes, 0, readBytes);
                }
            } finally {
                if (fileIn != null) {
                    fileIn.close();
                }
            }

            return false;
        }
    }
}

Are there any problems with that implementation? Also, we think that we
don't need an implementation of OutputFormat as we are not going to store
data in Avro format. That is why we pass
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat to OUTPUTFORMAT
clause when creating a table. For instance:
CREATE EXTERNAL TABLE avro_data (Url STRING, ContentType STRING)
ROW FORMAT SERDE 'test.AvroSerDe'
STORED AS INPUTFORMAT 'test.AvroInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/data/avro';
Is this correct?

Regarding our SerDe. Again, we are not going to store data in Avro format
and that is why we implement only Deserializer. Here is our dummy/testing
implementation:

public class AvroSerDe implements Deserializer{
    private int count = 0;
    private ObjectInspector objectInspector;

    @Override
    public Object deserialize(Writable value) throws SerDeException {

        // dummy implementation
        String url = "http://dummy.com/?p=" + count;
        count++;
        String contentType = "dummy web site content";

        LinkedList<String> columns = new LinkedList<String>();
        columns.add(url);
        columns.add(contentType);

        return columns;
    }

    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
        return objectInspector;
    }

    private ObjectInspector createObjectInspector() {
        List<String> structFieldNames = new ArrayList<String>();
        structFieldNames.add("URL");
        structFieldNames.add("ContentType");

        List<ObjectInspector> structFieldObjectInspectors = new
ArrayList<ObjectInspector>();

structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(
                structFieldNames,
                structFieldObjectInspectors);
    }

    @Override
    public void initialize(Configuration arg0, Properties arg1) throws
SerDeException {
        objectInspector = createObjectInspector();

    }

}


When we are executing SELECT statement against the created avro_data table,
we don't get any results. When logging is added to those classes, it seems
that AvroSerDe is not used and only AvroInputFormat is called.

Anyway, when the data_avro table is defined in the following way:
CREATE EXTERNAL TABLE data_avro (Url STRING, ContentType STRING)
ROW FORMAT SERDE 'test.AvroSerDe'
STORED AS TEXTFILE
LOCATION '/data/avro';
we got many dummy results:
...
http://dummy.com/?p=39089    dummy web site content
http://dummy.com/?p=39090    dummy web site content
http://dummy.com/?p=39091    dummy web site content
Time taken: 5.616 seconds

Note that the /data/avro folder contains only 6 sample files that are split
into 39091 chunks. We do not want to split the input files due to reasons we
mentioned above.
Do you have any ideas regarding these problems?

Thank you in advance!
Regards