You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ferenc Turi <tu...@gmail.com> on 2015/08/29 11:38:14 UTC

Kafka avro recordschema serializing / deserializing

Hi,

I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
connector but I have problems:

Exception says at  program startup:

Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

I know RecordSchema is not serializable so It's ok but how to add
serializer for RecordSchema?

My Flink initialization:

LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
MyDeserializer())).print();

The deserializer:

public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
    private static final long serialVersionUID = -8314881700393464119L;
    private static final EncoderFactory avroEncoderFactory =
EncoderFactory.get();
    private Schema _schema;

    public MyDeserializer(){
        System.out.println("Creating MyDeserializer");
        Schema.Parser parser = new Schema.Parser();
        try {
            InputStream is =
getClass().getResourceAsStream("/avro_schema.json");
            if (is != null){
                _schema = parser.parse(is);
            }else{
                System.out.println("Unable to load schema file!");
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

    public String deserialize(byte[] message) {
        String data = null;
        try {
            DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(message,
null);
            GenericRecord result = reader.read(null, decoder);
            AvroKafkaData ad = new
AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
            System.out.println("Read kafka data: " + data);
            data = ad.toString();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    public byte[] serialize(String element) {
        System.out.println("Serializing element = " + element);
        byte[] data = null;
        try {
            GenericDatumWriter writer = new GenericDatumWriter(_schema);


            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            DatumReader<GenericRecord> reader=new
GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
element);

            GenericRecord r=reader.read(null,decoder);

            BinaryEncoder binaryEncoder =
avroEncoderFactory.binaryEncoder(stream, null);

            writer.write(r, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeStream(stream);

            data = stream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

}

Unfortunately as I see only the constructor of MySerializer is called.

Can somebody could suggest something?

Thanks,

Ferenc



-- 
Kind Regards,

Ferenc

Re: Kafka avro recordschema serializing / deserializing

Posted by Stephan Ewen <se...@apache.org>.
The functions/sources have an open() method that is exactly intended for
this type of initialization (constructing the Avro Schema).

You can try and subclass the kafka source and override the open() method to
initialize the schema there. Make sure you call super.open().


Greetings,
Stephan


On Sat, Aug 29, 2015 at 11:58 AM, Robert Metzger <rm...@apache.org>
wrote:

> Hi,
> yes, the Avro Schema is not serializable.
>
> Can you make the "_schema" field "transient" and then lazily initialize
> the field when serialize()/deserialize() is called?
> That way, you initialize the schema on the cluster, so there is no need to
> transfer it over the network.
>
>
> I think Flink's own serialization stack should also be able to handle Avro
> types with Kafka. I'm trying to get the required tooling into Flink
> 0.10-SNAPSHOT.
>
> Let me know if you need more help.
>
>
> Best,
> Robert
>
>
>
>
> On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <tu...@gmail.com> wrote:
>
>> Hi,
>>
>> I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
>> connector but I have problems:
>>
>> Exception says at  program startup:
>>
>> Caused by: java.io.NotSerializableException:
>> org.apache.avro.Schema$RecordSchema
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>
>> I know RecordSchema is not serializable so It's ok but how to add
>> serializer for RecordSchema?
>>
>> My Flink initialization:
>>
>> LocalStreamEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>> env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
>> MyDeserializer())).print();
>>
>> The deserializer:
>>
>> public class MyDeserializer implements DeserializationSchema<String>,
>> SerializationSchema<String, byte[]>{
>>     private static final long serialVersionUID = -8314881700393464119L;
>>     private static final EncoderFactory avroEncoderFactory =
>> EncoderFactory.get();
>>     private Schema _schema;
>>
>>     public MyDeserializer(){
>>         System.out.println("Creating MyDeserializer");
>>         Schema.Parser parser = new Schema.Parser();
>>         try {
>>             InputStream is =
>> getClass().getResourceAsStream("/avro_schema.json");
>>             if (is != null){
>>                 _schema = parser.parse(is);
>>             }else{
>>                 System.out.println("Unable to load schema file!");
>>             }
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>             throw new RuntimeException(e);
>>         }
>>     }
>>
>>     public TypeInformation<String> getProducedType() {
>>         return TypeExtractor.getForClass(String.class);
>>     }
>>
>>     public String deserialize(byte[] message) {
>>         String data = null;
>>         try {
>>             DatumReader<GenericRecord> reader = new
>> GenericDatumReader<GenericRecord>(_schema);
>>             Decoder decoder = DecoderFactory.get().binaryDecoder(message,
>> null);
>>             GenericRecord result = reader.read(null, decoder);
>>             AvroKafkaData ad = new
>> AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
>>             System.out.println("Read kafka data: " + data);
>>             data = ad.toString();
>>         } catch (IOException e) {
>>             throw new RuntimeException(e);
>>         }
>>         return data;
>>     }
>>
>>     public boolean isEndOfStream(String nextElement) {
>>         return false;
>>     }
>>
>>     public byte[] serialize(String element) {
>>         System.out.println("Serializing element = " + element);
>>         byte[] data = null;
>>         try {
>>             GenericDatumWriter writer = new
>> GenericDatumWriter(_schema);
>>
>>             ByteArrayOutputStream stream = new ByteArrayOutputStream();
>>
>>             DatumReader<GenericRecord> reader=new
>> GenericDatumReader<GenericRecord>(_schema);
>>             Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
>> element);
>>
>>             GenericRecord r=reader.read(null,decoder);
>>
>>             BinaryEncoder binaryEncoder =
>> avroEncoderFactory.binaryEncoder(stream, null);
>>
>>             writer.write(r, binaryEncoder);
>>             binaryEncoder.flush();
>>             IOUtils.closeStream(stream);
>>
>>             data = stream.toByteArray();
>>         } catch (IOException e) {
>>             throw new RuntimeException(e);
>>         }
>>         return data;
>>     }
>>
>> }
>>
>> Unfortunately as I see only the constructor of MySerializer is called.
>>
>> Can somebody could suggest something?
>>
>> Thanks,
>>
>> Ferenc
>>
>>
>>
>> --
>> Kind Regards,
>>
>> Ferenc
>>
>>
>>
>

Re: Kafka avro recordschema serializing / deserializing

Posted by Robert Metzger <rm...@apache.org>.
Hi,
yes, the Avro Schema is not serializable.

Can you make the "_schema" field "transient" and then lazily initialize the
field when serialize()/deserialize() is called?
That way, you initialize the schema on the cluster, so there is no need to
transfer it over the network.


I think Flink's own serialization stack should also be able to handle Avro
types with Kafka. I'm trying to get the required tooling into Flink
0.10-SNAPSHOT.

Let me know if you need more help.


Best,
Robert




On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <tu...@gmail.com> wrote:

> Hi,
>
> I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
> connector but I have problems:
>
> Exception says at  program startup:
>
> Caused by: java.io.NotSerializableException:
> org.apache.avro.Schema$RecordSchema
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
> I know RecordSchema is not serializable so It's ok but how to add
> serializer for RecordSchema?
>
> My Flink initialization:
>
> LocalStreamEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
> MyDeserializer())).print();
>
> The deserializer:
>
> public class MyDeserializer implements DeserializationSchema<String>,
> SerializationSchema<String, byte[]>{
>     private static final long serialVersionUID = -8314881700393464119L;
>     private static final EncoderFactory avroEncoderFactory =
> EncoderFactory.get();
>     private Schema _schema;
>
>     public MyDeserializer(){
>         System.out.println("Creating MyDeserializer");
>         Schema.Parser parser = new Schema.Parser();
>         try {
>             InputStream is =
> getClass().getResourceAsStream("/avro_schema.json");
>             if (is != null){
>                 _schema = parser.parse(is);
>             }else{
>                 System.out.println("Unable to load schema file!");
>             }
>         } catch (IOException e) {
>             e.printStackTrace();
>             throw new RuntimeException(e);
>         }
>     }
>
>     public TypeInformation<String> getProducedType() {
>         return TypeExtractor.getForClass(String.class);
>     }
>
>     public String deserialize(byte[] message) {
>         String data = null;
>         try {
>             DatumReader<GenericRecord> reader = new
> GenericDatumReader<GenericRecord>(_schema);
>             Decoder decoder = DecoderFactory.get().binaryDecoder(message,
> null);
>             GenericRecord result = reader.read(null, decoder);
>             AvroKafkaData ad = new
> AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
>             System.out.println("Read kafka data: " + data);
>             data = ad.toString();
>         } catch (IOException e) {
>             throw new RuntimeException(e);
>         }
>         return data;
>     }
>
>     public boolean isEndOfStream(String nextElement) {
>         return false;
>     }
>
>     public byte[] serialize(String element) {
>         System.out.println("Serializing element = " + element);
>         byte[] data = null;
>         try {
>             GenericDatumWriter writer = new
> GenericDatumWriter(_schema);
>
>             ByteArrayOutputStream stream = new ByteArrayOutputStream();
>
>             DatumReader<GenericRecord> reader=new
> GenericDatumReader<GenericRecord>(_schema);
>             Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
> element);
>
>             GenericRecord r=reader.read(null,decoder);
>
>             BinaryEncoder binaryEncoder =
> avroEncoderFactory.binaryEncoder(stream, null);
>
>             writer.write(r, binaryEncoder);
>             binaryEncoder.flush();
>             IOUtils.closeStream(stream);
>
>             data = stream.toByteArray();
>         } catch (IOException e) {
>             throw new RuntimeException(e);
>         }
>         return data;
>     }
>
> }
>
> Unfortunately as I see only the constructor of MySerializer is called.
>
> Can somebody could suggest something?
>
> Thanks,
>
> Ferenc
>
>
>
> --
> Kind Regards,
>
> Ferenc
>
>
>