You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Vishwas (JIRA)" <ji...@apache.org> on 2019/04/14 13:52:00 UTC
[jira] [Created] (BEAM-7073) AvroUtils converting generic record to
Beam Row causes class cast exception
Vishwas created BEAM-7073:
-----------------------------
Summary: AvroUtils converting generic record to Beam Row causes class cast exception
Key: BEAM-7073
URL: https://issues.apache.org/jira/browse/BEAM-7073
Project: Beam
Issue Type: Bug
Components: sdk-java-core
Affects Versions: 2.11.0
Environment: Direct Runner
Reporter: Vishwas
Below is my pipeline:
KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> KafkaSink (KafkaIO.write)
Kafka Source IO reads from Kafka topic avro records and deserializes it to generic record using below
KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(bootstrapServerUrl)
.withTopic(topicName)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(GenericRecord.class, avroSchema))
.updateConsumerProperties(ImmutableMap.of("schema.registry.url",
schemaRegistryUrl));
Avro schema of the topic has a logicaltype (timestamp-millis). This is deserialized to
joda-time.
{
"name": "timeOfRelease",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp"
}
],
"default": null,
}
Now in my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception
AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
This looks like a bug as joda time type created as part of deserialization is being type casted to Long in below code.
else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return convertDateTimeStrict((Long) value, fieldType);
}
PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned avro schema and I see that the attribute with timestamp-millis logical type is being converted to joda-time.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)