You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tarandeep Singh <ta...@gmail.com> on 2016/04/01 10:04:48 UTC

Example - Reading Avro Generic records

Hi,

Can someone please point me to an example of creating DataSet using Avro
Generic Records?

I tried this code -

    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final Path iPath = new Path(args[0]);

    DataSet<GenericRecord> dataSet = env.createInput(new
AvroInputFormat<>(iPath, GenericRecord.class));
    dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
        @Override
        public Tuple2<Integer,String> map(GenericRecord record) {
            Integer id = (Integer) record.get("id");
            String userAgent = (String) record.get("user_agent");
            return new Tuple2<>(id, userAgent);
        }
    }).writeAsText(args[1]);

    env.execute();

But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
    at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at
org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro
file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep

Re: Example - Reading Avro Generic records

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
There is a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com> wrote:

> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     final Path iPath = new Path(args[0]);
>
>     DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>         @Override
>         public Tuple2<Integer,String> map(GenericRecord record) {
>             Integer id = (Integer) record.get("id");
>             String userAgent = (String) record.get("user_agent");
>             return new Tuple2<>(id, userAgent);
>         }
>     }).writeAsText(args[1]);
>
>     env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
>     at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>     at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>     at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>     at
> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>     at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>     at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>     at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Example - Reading Avro Generic records

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tranadeep,

Also, in your code example, when *reuseAvroValue* is *false* the code will
fail with this message:

java.lang.RuntimeException: The class
'org.apache.avro.generic.GenericRecord' is not instantiable: The class is
no proper class, it is either abstract, an interface, or a primitive type.
at
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at



I had encountered this when I was write the PR.

-Gna

On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:

> Tranadeep,
>
> Thanks for pasting your code!
>
> I have a PR ready that extends AvroInputFormat and will submit it soon.
>
> Still waiting for the legal team at AOL to approve it.
>
> -Gna
>
> On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <ta...@gmail.com>
> wrote:
>
>> Thank you Gna for opening the ticket.
>>
>> I looked into AvroInputFormat code and inspired by it I wrote a
>> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
>> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
>> in flink to support GenericRecord.
>>
>> Anyways, I am pasting the code here for anyone who wants to use it (till
>> your code is part of Flink stable release)-
>>
>> import java.io.IOException;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileReader;
>> import org.apache.avro.file.FileReader;
>> import org.apache.avro.file.SeekableInput;
>> import org.apache.avro.generic.GenericDatumReader;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.io.DatumReader;
>> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
>> import org.apache.flink.api.common.io.FileInputFormat;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.TypeExtractor;
>> import org.apache.flink.core.fs.FileInputSplit;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.util.InstantiationUtil;
>>
>> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {
>>
>>     private transient long end;
>>     private transient Schema schema;
>>     private transient FileReader<GenericRecord> fileReader;
>>     private boolean reuseAvroValue = true;
>>
>>     private static final long serialVersionUID = 1L;
>>
>>     public GenericAvroInputFormat(Path filePath, Schema schema) {
>>         super(filePath);
>>         this.schema = schema;
>>     }
>>
>>     public void setReuseAvroValue(boolean reuseAvroValue) {
>>         this.reuseAvroValue = reuseAvroValue;
>>     }
>>
>>     public void setUnsplittable(boolean unsplittable) {
>>         this.unsplittable = unsplittable;
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     @Override
>>     public void open(FileInputSplit split) throws IOException {
>>         super.open(split);
>>         SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>>         DatumReader<GenericRecord> reader = new GenericDatumReader<>();
>>         fileReader = DataFileReader.openReader(sin, reader);
>>         fileReader.sync(split.getStart());
>>         this.end = split.getStart() + split.getLength();
>>     }
>>
>>     @Override
>>     public boolean reachedEnd() throws IOException {
>>         return !fileReader.hasNext() || fileReader.pastSync(end);
>>     }
>>
>>     @Override
>>     public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
>>         if (reachedEnd()) {
>>             return null;
>>         }
>>
>>         if (!reuseAvroValue) {
>>             reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
>>         }
>>
>>         reuseValue = fileReader.next(reuseValue);
>>         return reuseValue;
>>     }
>> }
>>
>>
>> Usage:
>>
>> public static void main(String[] args) throws Exception {
>>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>     final Path inPath = new Path(args[0]);
>>
>>     Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
>>     DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
>>         @Override
>>         public Tuple2<Long,String> map(GenericRecord record) {
>>             Long id = (Long) record.get("id");
>>             String someString = record.get("somestring").toString();
>>             return new Tuple2<>(id, someString);
>>         }
>>     }).writeAsText(args[1]);
>>
>>     env.execute();
>> }
>>
>>
>> -Tarandeep
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
>> gna.phetsarath@teamaol.com> wrote:
>>
>>> Tarandeep,
>>>
>>> There isn't a way yet, but I am proposing to do one:
>>> https://issues.apache.org/jira/browse/FLINK-3691
>>>
>>> -Gna
>>>
>>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can someone please point me to an example of creating DataSet using
>>>> Avro Generic Records?
>>>>
>>>> I tried this code -
>>>>
>>>>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>     final Path iPath = new Path(args[0]);
>>>>
>>>>     DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>>>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>>>         @Override
>>>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>>>             Integer id = (Integer) record.get("id");
>>>>             String userAgent = (String) record.get("user_agent");
>>>>             return new Tuple2<>(id, userAgent);
>>>>         }
>>>>     }).writeAsText(args[1]);
>>>>
>>>>     env.execute();
>>>>
>>>> But I got an exception-
>>>>
>>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>>> interface org.apache.avro.generic.GenericRecord
>>>>     at
>>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>>>     at
>>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>>     at
>>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>>>     at
>>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>>>     at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>>>     at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>>>     at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>>>
>>>>
>>>> Thanks,
>>>> Tarandeep
>>>>
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * <http://www.aolplatforms.com>*
>>>
>>
>>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Example - Reading Avro Generic records

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tranadeep,

Thanks for pasting your code!

I have a PR ready that extends AvroInputFormat and will submit it soon.

Still waiting for the legal team at AOL to approve it.

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <ta...@gmail.com> wrote:

> Thank you Gna for opening the ticket.
>
> I looked into AvroInputFormat code and inspired by it I wrote a
> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
> in flink to support GenericRecord.
>
> Anyways, I am pasting the code here for anyone who wants to use it (till
> your code is part of Flink stable release)-
>
> import java.io.IOException;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.file.SeekableInput;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
> import org.apache.flink.api.common.io.FileInputFormat;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.core.fs.FileInputSplit;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.util.InstantiationUtil;
>
> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {
>
>     private transient long end;
>     private transient Schema schema;
>     private transient FileReader<GenericRecord> fileReader;
>     private boolean reuseAvroValue = true;
>
>     private static final long serialVersionUID = 1L;
>
>     public GenericAvroInputFormat(Path filePath, Schema schema) {
>         super(filePath);
>         this.schema = schema;
>     }
>
>     public void setReuseAvroValue(boolean reuseAvroValue) {
>         this.reuseAvroValue = reuseAvroValue;
>     }
>
>     public void setUnsplittable(boolean unsplittable) {
>         this.unsplittable = unsplittable;
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     @Override
>     public void open(FileInputSplit split) throws IOException {
>         super.open(split);
>         SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>         DatumReader<GenericRecord> reader = new GenericDatumReader<>();
>         fileReader = DataFileReader.openReader(sin, reader);
>         fileReader.sync(split.getStart());
>         this.end = split.getStart() + split.getLength();
>     }
>
>     @Override
>     public boolean reachedEnd() throws IOException {
>         return !fileReader.hasNext() || fileReader.pastSync(end);
>     }
>
>     @Override
>     public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
>         if (reachedEnd()) {
>             return null;
>         }
>
>         if (!reuseAvroValue) {
>             reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
>         }
>
>         reuseValue = fileReader.next(reuseValue);
>         return reuseValue;
>     }
> }
>
>
> Usage:
>
> public static void main(String[] args) throws Exception {
>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     final Path inPath = new Path(args[0]);
>
>     Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
>     DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
>         @Override
>         public Tuple2<Long,String> map(GenericRecord record) {
>             Long id = (Long) record.get("id");
>             String someString = record.get("somestring").toString();
>             return new Tuple2<>(id, someString);
>         }
>     }).writeAsText(args[1]);
>
>     env.execute();
> }
>
>
> -Tarandeep
>
>
>
>
>
>
>
> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
> gna.phetsarath@teamaol.com> wrote:
>
>> Tarandeep,
>>
>> There isn't a way yet, but I am proposing to do one:
>> https://issues.apache.org/jira/browse/FLINK-3691
>>
>> -Gna
>>
>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Can someone please point me to an example of creating DataSet using Avro
>>> Generic Records?
>>>
>>> I tried this code -
>>>
>>>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>     final Path iPath = new Path(args[0]);
>>>
>>>     DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>>         @Override
>>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>>             Integer id = (Integer) record.get("id");
>>>             String userAgent = (String) record.get("user_agent");
>>>             return new Tuple2<>(id, userAgent);
>>>         }
>>>     }).writeAsText(args[1]);
>>>
>>>     env.execute();
>>>
>>> But I got an exception-
>>>
>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>> interface org.apache.avro.generic.GenericRecord
>>>     at
>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>>     at
>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>     at
>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>>     at
>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>>     at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>>     at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>>     at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>>
>>>
>>> Thanks,
>>> Tarandeep
>>>
>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Example - Reading Avro Generic records

Posted by Tarandeep Singh <ta...@gmail.com>.
Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a
GenericAvroInputFormat. The code is awfully similar (and hence redundant)
to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till
your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends
FileInputFormat<GenericRecord> implements
ResultTypeQueryable<GenericRecord> {

    private transient long end;
    private transient Schema schema;
    private transient FileReader<GenericRecord> fileReader;
    private boolean reuseAvroValue = true;

    private static final long serialVersionUID = 1L;

    public GenericAvroInputFormat(Path filePath, Schema schema) {
        super(filePath);
        this.schema = schema;
    }

    public void setReuseAvroValue(boolean reuseAvroValue) {
        this.reuseAvroValue = reuseAvroValue;
    }

    public void setUnsplittable(boolean unsplittable) {
        this.unsplittable = unsplittable;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        SeekableInput sin = new FSDataInputStreamWrapper(stream,
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
        DatumReader<GenericRecord> reader = new GenericDatumReader<>();
        fileReader = DataFileReader.openReader(sin, reader);
        fileReader.sync(split.getStart());
        this.end = split.getStart() + split.getLength();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return !fileReader.hasNext() || fileReader.pastSync(end);
    }

    @Override
    public GenericRecord nextRecord(GenericRecord reuseValue) throws
IOException {
        if (reachedEnd()) {
            return null;
        }

        if (!reuseAvroValue) {
            reuseValue =
InstantiationUtil.instantiate(GenericRecord.class, Object.class);
        }

        reuseValue = fileReader.next(reuseValue);
        return reuseValue;
    }
}


Usage:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final Path inPath = new Path(args[0]);

    Schema schema = new Schema.Parser().parse(new
File("/path/to/schemafile.avsc"));
    DataSet<GenericRecord> dataSet = env.createInput(new
GenericAvroInputFormat(inPath, schema));
    dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
        @Override
        public Tuple2<Long,String> map(GenericRecord record) {
            Long id = (Long) record.get("id");
            String someString = record.get("somestring").toString();
            return new Tuple2<>(id, someString);
        }
    }).writeAsText(args[1]);

    env.execute();
}


-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:

> Tarandeep,
>
> There isn't a way yet, but I am proposing to do one:
> https://issues.apache.org/jira/browse/FLINK-3691
>
> -Gna
>
> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Can someone please point me to an example of creating DataSet using Avro
>> Generic Records?
>>
>> I tried this code -
>>
>>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>     final Path iPath = new Path(args[0]);
>>
>>     DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>         @Override
>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>             Integer id = (Integer) record.get("id");
>>             String userAgent = (String) record.get("user_agent");
>>             return new Tuple2<>(id, userAgent);
>>         }
>>     }).writeAsText(args[1]);
>>
>>     env.execute();
>>
>> But I got an exception-
>>
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>>     at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>     at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>     at
>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>     at
>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>     at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>     at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>     at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>
>>
>> Thanks,
>> Tarandeep
>>
>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>

Re: Example - Reading Avro Generic records

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tarandeep,

There isn't a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com> wrote:

> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
>     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     final Path iPath = new Path(args[0]);
>
>     DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>         @Override
>         public Tuple2<Integer,String> map(GenericRecord record) {
>             Integer id = (Integer) record.get("id");
>             String userAgent = (String) record.get("user_agent");
>             return new Tuple2<>(id, userAgent);
>         }
>     }).writeAsText(args[1]);
>
>     env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
>     at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>     at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>     at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>     at
> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>     at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>     at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>     at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*