You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tarandeep Singh <ta...@gmail.com> on 2016/04/01 10:04:48 UTC
Example - Reading Avro Generic records
Hi,
Can someone please point me to an example of creating DataSet using Avro
Generic Records?
I tried this code -
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);
DataSet<GenericRecord> dataSet = env.createInput(new
AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);
env.execute();
But I got an exception-
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
at
org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
By looking at StackTrace, I get that AvroInputFormat tries to read Avro
file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
Thanks,
Tarandeep
Re: Example - Reading Avro Generic records
Posted by Sourigna Phetsarath <gn...@teamaol.com>.
There is a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691
On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com> wrote:
> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> final Path iPath = new Path(args[0]);
>
> DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
> @Override
> public Tuple2<Integer,String> map(GenericRecord record) {
> Integer id = (Integer) record.get("id");
> String userAgent = (String) record.get("user_agent");
> return new Tuple2<>(id, userAgent);
> }
> }).writeAsText(args[1]);
>
> env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
> at
> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>
--
*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna
* <http://www.aolplatforms.com>*
Re: Example - Reading Avro Generic records
Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tranadeep,
Also, in your code example, when *reuseAvroValue* is *false* the code will
fail with this message:
java.lang.RuntimeException: The class
'org.apache.avro.generic.GenericRecord' is not instantiable: The class is
no proper class, it is either abstract, an interface, or a primitive type.
at
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at
I had encountered this when I was write the PR.
-Gna
On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:
> Tranadeep,
>
> Thanks for pasting your code!
>
> I have a PR ready that extends AvroInputFormat and will submit it soon.
>
> Still waiting for the legal team at AOL to approve it.
>
> -Gna
>
> On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <ta...@gmail.com>
> wrote:
>
>> Thank you Gna for opening the ticket.
>>
>> I looked into AvroInputFormat code and inspired by it I wrote a
>> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
>> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
>> in flink to support GenericRecord.
>>
>> Anyways, I am pasting the code here for anyone who wants to use it (till
>> your code is part of Flink stable release)-
>>
>> import java.io.IOException;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileReader;
>> import org.apache.avro.file.FileReader;
>> import org.apache.avro.file.SeekableInput;
>> import org.apache.avro.generic.GenericDatumReader;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.io.DatumReader;
>> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
>> import org.apache.flink.api.common.io.FileInputFormat;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.TypeExtractor;
>> import org.apache.flink.core.fs.FileInputSplit;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.util.InstantiationUtil;
>>
>> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {
>>
>> private transient long end;
>> private transient Schema schema;
>> private transient FileReader<GenericRecord> fileReader;
>> private boolean reuseAvroValue = true;
>>
>> private static final long serialVersionUID = 1L;
>>
>> public GenericAvroInputFormat(Path filePath, Schema schema) {
>> super(filePath);
>> this.schema = schema;
>> }
>>
>> public void setReuseAvroValue(boolean reuseAvroValue) {
>> this.reuseAvroValue = reuseAvroValue;
>> }
>>
>> public void setUnsplittable(boolean unsplittable) {
>> this.unsplittable = unsplittable;
>> }
>>
>> @Override
>> public TypeInformation<GenericRecord> getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> @Override
>> public void open(FileInputSplit split) throws IOException {
>> super.open(split);
>> SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>> DatumReader<GenericRecord> reader = new GenericDatumReader<>();
>> fileReader = DataFileReader.openReader(sin, reader);
>> fileReader.sync(split.getStart());
>> this.end = split.getStart() + split.getLength();
>> }
>>
>> @Override
>> public boolean reachedEnd() throws IOException {
>> return !fileReader.hasNext() || fileReader.pastSync(end);
>> }
>>
>> @Override
>> public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
>> if (reachedEnd()) {
>> return null;
>> }
>>
>> if (!reuseAvroValue) {
>> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
>> }
>>
>> reuseValue = fileReader.next(reuseValue);
>> return reuseValue;
>> }
>> }
>>
>>
>> Usage:
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> final Path inPath = new Path(args[0]);
>>
>> Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
>> DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
>> @Override
>> public Tuple2<Long,String> map(GenericRecord record) {
>> Long id = (Long) record.get("id");
>> String someString = record.get("somestring").toString();
>> return new Tuple2<>(id, someString);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>> }
>>
>>
>> -Tarandeep
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
>> gna.phetsarath@teamaol.com> wrote:
>>
>>> Tarandeep,
>>>
>>> There isn't a way yet, but I am proposing to do one:
>>> https://issues.apache.org/jira/browse/FLINK-3691
>>>
>>> -Gna
>>>
>>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can someone please point me to an example of creating DataSet using
>>>> Avro Generic Records?
>>>>
>>>> I tried this code -
>>>>
>>>> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>> final Path iPath = new Path(args[0]);
>>>>
>>>> DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>>>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>>> @Override
>>>> public Tuple2<Integer,String> map(GenericRecord record) {
>>>> Integer id = (Integer) record.get("id");
>>>> String userAgent = (String) record.get("user_agent");
>>>> return new Tuple2<>(id, userAgent);
>>>> }
>>>> }).writeAsText(args[1]);
>>>>
>>>> env.execute();
>>>>
>>>> But I got an exception-
>>>>
>>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>>> interface org.apache.avro.generic.GenericRecord
>>>> at
>>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>>> at
>>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>> at
>>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>>> at
>>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>>> at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>>> at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>>>
>>>>
>>>> Thanks,
>>>> Tarandeep
>>>>
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * <http://www.aolplatforms.com>*
>>>
>>
>>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>
--
*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna
* <http://www.aolplatforms.com>*
Re: Example - Reading Avro Generic records
Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tranadeep,
Thanks for pasting your code!
I have a PR ready that extends AvroInputFormat and will submit it soon.
Still waiting for the legal team at AOL to approve it.
-Gna
On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <ta...@gmail.com> wrote:
> Thank you Gna for opening the ticket.
>
> I looked into AvroInputFormat code and inspired by it I wrote a
> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
> in flink to support GenericRecord.
>
> Anyways, I am pasting the code here for anyone who wants to use it (till
> your code is part of Flink stable release)-
>
> import java.io.IOException;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.file.SeekableInput;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
> import org.apache.flink.api.common.io.FileInputFormat;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.core.fs.FileInputSplit;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.util.InstantiationUtil;
>
> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {
>
> private transient long end;
> private transient Schema schema;
> private transient FileReader<GenericRecord> fileReader;
> private boolean reuseAvroValue = true;
>
> private static final long serialVersionUID = 1L;
>
> public GenericAvroInputFormat(Path filePath, Schema schema) {
> super(filePath);
> this.schema = schema;
> }
>
> public void setReuseAvroValue(boolean reuseAvroValue) {
> this.reuseAvroValue = reuseAvroValue;
> }
>
> public void setUnsplittable(boolean unsplittable) {
> this.unsplittable = unsplittable;
> }
>
> @Override
> public TypeInformation<GenericRecord> getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
> DatumReader<GenericRecord> reader = new GenericDatumReader<>();
> fileReader = DataFileReader.openReader(sin, reader);
> fileReader.sync(split.getStart());
> this.end = split.getStart() + split.getLength();
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return !fileReader.hasNext() || fileReader.pastSync(end);
> }
>
> @Override
> public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
> if (reachedEnd()) {
> return null;
> }
>
> if (!reuseAvroValue) {
> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
> }
>
> reuseValue = fileReader.next(reuseValue);
> return reuseValue;
> }
> }
>
>
> Usage:
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> final Path inPath = new Path(args[0]);
>
> Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
> DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
> dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
> @Override
> public Tuple2<Long,String> map(GenericRecord record) {
> Long id = (Long) record.get("id");
> String someString = record.get("somestring").toString();
> return new Tuple2<>(id, someString);
> }
> }).writeAsText(args[1]);
>
> env.execute();
> }
>
>
> -Tarandeep
>
>
>
>
>
>
>
> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
> gna.phetsarath@teamaol.com> wrote:
>
>> Tarandeep,
>>
>> There isn't a way yet, but I am proposing to do one:
>> https://issues.apache.org/jira/browse/FLINK-3691
>>
>> -Gna
>>
>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Can someone please point me to an example of creating DataSet using Avro
>>> Generic Records?
>>>
>>> I tried this code -
>>>
>>> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>> final Path iPath = new Path(args[0]);
>>>
>>> DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>> @Override
>>> public Tuple2<Integer,String> map(GenericRecord record) {
>>> Integer id = (Integer) record.get("id");
>>> String userAgent = (String) record.get("user_agent");
>>> return new Tuple2<>(id, userAgent);
>>> }
>>> }).writeAsText(args[1]);
>>>
>>> env.execute();
>>>
>>> But I got an exception-
>>>
>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>> interface org.apache.avro.generic.GenericRecord
>>> at
>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>> at
>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>> at
>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>> at
>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>> at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>> at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>>
>>>
>>> Thanks,
>>> Tarandeep
>>>
>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>
--
*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna
* <http://www.aolplatforms.com>*
Re: Example - Reading Avro Generic records
Posted by Tarandeep Singh <ta...@gmail.com>.
Thank you Gna for opening the ticket.
I looked into AvroInputFormat code and inspired by it I wrote a
GenericAvroInputFormat. The code is awfully similar (and hence redundant)
to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
in flink to support GenericRecord.
Anyways, I am pasting the code here for anyone who wants to use it (till
your code is part of Flink stable release)-
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
public class GenericAvroInputFormat extends
FileInputFormat<GenericRecord> implements
ResultTypeQueryable<GenericRecord> {
private transient long end;
private transient Schema schema;
private transient FileReader<GenericRecord> fileReader;
private boolean reuseAvroValue = true;
private static final long serialVersionUID = 1L;
public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}
public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}
public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream,
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}
@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}
@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws
IOException {
if (reachedEnd()) {
return null;
}
if (!reuseAvroValue) {
reuseValue =
InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}
reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}
Usage:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);
Schema schema = new Schema.Parser().parse(new
File("/path/to/schemafile.avsc"));
DataSet<GenericRecord> dataSet = env.createInput(new
GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
@Override
public Tuple2<Long,String> map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);
env.execute();
}
-Tarandeep
On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:
> Tarandeep,
>
> There isn't a way yet, but I am proposing to do one:
> https://issues.apache.org/jira/browse/FLINK-3691
>
> -Gna
>
> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Can someone please point me to an example of creating DataSet using Avro
>> Generic Records?
>>
>> I tried this code -
>>
>> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> final Path iPath = new Path(args[0]);
>>
>> DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>> @Override
>> public Tuple2<Integer,String> map(GenericRecord record) {
>> Integer id = (Integer) record.get("id");
>> String userAgent = (String) record.get("user_agent");
>> return new Tuple2<>(id, userAgent);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>>
>> But I got an exception-
>>
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>> at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>> at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>> at
>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>> at
>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>> at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>> at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>
>>
>> Thanks,
>> Tarandeep
>>
>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>
Re: Example - Reading Avro Generic records
Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Tarandeep,
There isn't a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691
-Gna
On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <ta...@gmail.com> wrote:
> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> final Path iPath = new Path(args[0]);
>
> DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
> @Override
> public Tuple2<Integer,String> map(GenericRecord record) {
> Integer id = (Integer) record.get("id");
> String userAgent = (String) record.get("user_agent");
> return new Tuple2<>(id, userAgent);
> }
> }).writeAsText(args[1]);
>
> env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
> at
> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>
--
*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna
* <http://www.aolplatforms.com>*