You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ryan Skraba (Jira)" <ji...@apache.org> on 2019/10/11 14:56:00 UTC
[jira] [Commented] (BEAM-7073) AvroUtils converting generic record
to Beam Row causes class cast exception
[ https://issues.apache.org/jira/browse/BEAM-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949523#comment-16949523 ]
Ryan Skraba commented on BEAM-7073:
-----------------------------------
I agree -- I noted that the PR above doesn't have a unit test, and I have one for exactly this case. I'll do a PR (but I'm pretty convinced that the bug as described is fixed).
> AvroUtils converting generic record to Beam Row causes class cast exception
> ---------------------------------------------------------------------------
>
> Key: BEAM-7073
> URL: https://issues.apache.org/jira/browse/BEAM-7073
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.11.0
> Environment: Direct Runner
> Reporter: Vishwas
> Assignee: Ryan Skraba
> Priority: Major
>
> Below is my pipeline:
> KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> KafkaSink (KafkaIO.write)
> Kafka Source IO reads from Kafka topic avro records and deserializes it to generic record using below
> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read()
> .withBootstrapServers(bootstrapServerUrl)
> .withTopic(topicName)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
> AvroCoder.of(GenericRecord.class, avroSchema))
> .updateConsumerProperties(ImmutableMap.of("schema.registry.url",
> schemaRegistryUrl));
> Avro schema of the topic has a logicaltype (timestamp-millis). This is deserialized to
> joda-time.
> {
> "name": "timeOfRelease",
> "type": [
> "null",
> {
> "type": "long",
> "logicalType": "timestamp-millis",
> "connect.version": 1,
> "connect.name": "org.apache.kafka.connect.data.Timestamp"
> }
> ],
> "default": null,
> }
> Now in my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception
> AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
> at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
> at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>
> This looks like a bug as joda time type created as part of deserialization is being type casted to Long in below code.
> else if (logicalType instanceof LogicalTypes.TimestampMillis) {
> return convertDateTimeStrict((Long) value, fieldType);
> }
> PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned avro schema and I see that the attribute with timestamp-millis logical type is being converted to joda-time.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)