You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by chinchu chinchu <ch...@gmail.com> on 2019/01/23 17:28:16 UTC
Error deserializing Avro message when using SpecificSerde
Hey folks,
I am getting the below error when reading data from a kafka topic . I have
used confluent serializers to serialize this data but when trying to
consume using confuent deserializer running into the below error.Any
idea on what the issue could be here ?.Also how do I skip this record and
read the next one ?
10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response
10:17:34.872 [pool-1-thread-1] DEBUG
io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET
with input null to http://xxx.yy.com:8081/schemas/ids/321
10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response
10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition Logs-0 at offset 25106200.
If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 321
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class com.test.model.avro.Log specified in writer's schema whilst
finding reader's schema for a SpecificRecord.
//Consumer Configs:
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");
Re: Error deserializing Avro message when using SpecificSerde
Posted by chinchu chinchu <ch...@gmail.com>.
My application did not have the com.test.model.avro.Log class during run
time . The log messages indicated this,it was just my oversight.
On Wed, Jan 23, 2019 at 10:28 AM chinchu chinchu <ch...@gmail.com>
wrote:
> Hey folks,
> I am getting the below error when reading data from a kafka topic . I
> have used confluent serializers to serialize this data but when trying
> to consume using confuent deserializer running into the below error.Any
> idea on what the issue could be here ?.Also how do I skip this record and
> read the next one ?
>
>
> 10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
> 10:17:34.872 [pool-1-thread-1] DEBUG
> io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET
> with input null to http://xxx.yy.com:8081/schemas/ids/321
> 10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
> 10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
>
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition Logs-0 at offset 25106200.
> If needed, please seek past the record to continue consumption.
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 321
> Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not find class com.test.model.avro.Log specified in writer's schema whilst
> finding reader's schema for a SpecificRecord.
>
>
> //Consumer Configs:
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx:9092,yyy:9092");
> props.put("group.id", groupId);
> props.put("enable.auto.commit", "false");
> props.put("session.timeout.ms", "30000");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "io.confluent.kafka.serializers.KafkaAvroDeserializer");
> props.put("schema.registry.url", "http://xxx:8081");
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> props.put("max.poll.records", "100");
>