You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Filipe Couto <fi...@gmail.com> on 2018/03/02 17:42:57 UTC

Serialization and Deserialization of Avro messages stored in Kafka

Hello,

I have a few topics that I want to read from Kafka, which consist mainly on
a key value pair of: timestamp (key) and value (byte array).

The bite array doesn't really have a class to deserialize from, since the
Avro Record we have comes from a "SELECT * FROM..." that selects several
SQL tables and in each topic we have that table represented.

We're using a GenericRecord, and since we know the structure of the table
via the name of the topic we know the column names, like
this: genericRecord.get("COLUMN_NAME").toString()

Given this, we're now trying to read a Kafka topic using Flink, and we have
this:

The environment is the StreamExecutionEnvironment and the properties are
about the Kafka serialization and deserialization and Kafka and Zookeeper
IP addresses.

class...

DataStream<Object> messageStream = environment
.addSource(new FlinkKafkaConsumer010<>(baseTopic, new
MyDeserializationSchema(schema), properties));

messageStream.print();

try {
environment.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return false;
}
}

class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private static final Logger log =
LoggerFactory.getLogger(MyDeserializationSchema.class);

private final Class<T> avrotype = (Class<T>)
org.apache.avro.generic.GenericRecord.class;
private final Schema schema;
public MyDeserializationSchema(Schema schema) {
this.schema = schema;
}

@Override
public T deserialize(byte[] arg0) throws IOException {
log.info("Starting deserialization");
GenericRecord genericRecord;
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
.toBinary(schema);
log.info(recordInjection.toString());
genericRecord = recordInjection.invert(arg0).get();
log.info(genericRecord.toString());
return (T) genericRecord;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}

}

Executing this on our server generates the following:

[2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer
(key.deserializer)
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09)

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The implementation of
the FlinkKafkaConsumer09 is not serializable. The object probably contains
or references non serializable fields.
        at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
        at
com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153)
        at
com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66)
Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
        at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
        ... 6 more


I can't understand why the logs refer to a  FlinkKafkaConsumer09 when we're
using the  FlinkKafkaConsumer010 version.
And also, how can we deserialize to a GenericRecord so we can access the
record fields like we're doing when we're just reading a Kafka topic
without Flink.


Thanks in advance for any help that is given to us.

Re: Serialization and Deserialization of Avro messages stored in Kafka

Posted by Tzu-Li Tai <tz...@gmail.com>.
Hi Filipe,

What Gordon mentioned is correct. Did you manage to fix the issue?

From your code snippet, it looks like that the `Schema` field may not be
serializable. Could you double check that?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Serialization and Deserialization of Avro messages stored in Kafka

Posted by Gordon Weakliem <gw...@sovrn.com>.
The 010 consumer extends 09, so I'd guess whatever code is reporting sees
the FlinkKafkaConsumer010 as its superclass.

I've seen this error a bunch, and it's because MyDeserializationSchema
isn't serializable, or likely one of its fields is not serializable, or one
of the fields of its fields - you understand, everything in the object
graph has to be serializable.

Probably the easiest way to understand that is to write a unit test to make
sure that MyDeserializationSchema is serializable, essentially a test to
make sure ObjectOutputStream.writeObject will work. That's a pretty useful
test because you find out if a change to your MyDeserializationSchema will
break the runtime during the test phase instead of waiting until you get to
the deploy/run stage.


On Fri, Mar 2, 2018 at 10:42 AM, Filipe Couto <fi...@gmail.com>
wrote:

> Hello,
>
> I have a few topics that I want to read from Kafka, which consist mainly
> on a key value pair of: timestamp (key) and value (byte array).
>
> The bite array doesn't really have a class to deserialize from, since the
> Avro Record we have comes from a "SELECT * FROM..." that selects several
> SQL tables and in each topic we have that table represented.
>
> We're using a GenericRecord, and since we know the structure of the table
> via the name of the topic we know the column names, like
> this: genericRecord.get("COLUMN_NAME").toString()
>
> Given this, we're now trying to read a Kafka topic using Flink, and we
> have this:
>
> The environment is the StreamExecutionEnvironment and the properties are
> about the Kafka serialization and deserialization and Kafka and Zookeeper
> IP addresses.
>
> class...
>
> DataStream<Object> messageStream = environment
> .addSource(new FlinkKafkaConsumer010<>(baseTopic, new
> MyDeserializationSchema(schema), properties));
>
> messageStream.print();
>
> try {
> environment.execute();
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
>
> return false;
> }
> }
>
> class MyDeserializationSchema<T> implements DeserializationSchema<T> {
> private static final Logger log = LoggerFactory.getLogger(
> MyDeserializationSchema.class);
>
> private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.
> GenericRecord.class;
> private final Schema schema;
> public MyDeserializationSchema(Schema schema) {
> this.schema = schema;
> }
>
> @Override
> public T deserialize(byte[] arg0) throws IOException {
> log.info("Starting deserialization");
> GenericRecord genericRecord;
> Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
> .toBinary(schema);
> log.info(recordInjection.toString());
> genericRecord = recordInjection.invert(arg0).get();
> log.info(genericRecord.toString());
> return (T) genericRecord;
> }
>
> @Override
> public boolean isEndOfStream(T nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation<T> getProducedType() {
> return TypeExtractor.getForClass(avrotype);
> }
>
> }
>
> Executing this on our server generates the following:
>
> [2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer
> (key.deserializer) (org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer09)
>
> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException:
> The implementation of the FlinkKafkaConsumer09 is not serializable. The
> object probably contains or references non serializable fields.
>         at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
>         at com.i2s.analytics.flink.executors.LKTreatyExecutor.
> execute(LKTreatyExecutor.java:153)
>         at com.i2s.analytics.flink.job.DependenciesConsumer.main(
> DependenciesConsumer.java:66)
> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$
> RecordSchema
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>         at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>         ... 6 more
>
>
> I can't understand why the logs refer to a  FlinkKafkaConsumer09 when
> we're using the  FlinkKafkaConsumer010 version.
> And also, how can we deserialize to a GenericRecord so we can access the
> record fields like we're doing when we're just reading a Kafka topic
> without Flink.
>
>
> Thanks in advance for any help that is given to us.
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London    <https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>   <https://community.sovrn.com/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.