You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ryan Skraba (Jira)" <ji...@apache.org> on 2019/10/11 14:56:00 UTC

[jira] [Commented] (BEAM-7073) AvroUtils converting generic record to Beam Row causes class cast exception

    [ https://issues.apache.org/jira/browse/BEAM-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949523#comment-16949523 ] 

Ryan Skraba commented on BEAM-7073:
-----------------------------------

I agree -- I noted that the PR above doesn't have a unit test, and I have one for exactly this case.  I'll do a PR (but I'm pretty convinced that the bug as described is fixed).

> AvroUtils converting generic record to Beam Row causes class cast exception
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-7073
>                 URL: https://issues.apache.org/jira/browse/BEAM-7073
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.11.0
>         Environment: Direct Runner
>            Reporter: Vishwas
>            Assignee: Ryan Skraba
>            Priority: Major
>
> Below is my pipeline:
> KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> KafkaSink (KafkaIO.write)
> Kafka Source IO reads from Kafka topic avro records and deserializes it to generic record using below
> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read()
>                 .withBootstrapServers(bootstrapServerUrl)
>                 .withTopic(topicName)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>                                                                      AvroCoder.of(GenericRecord.class, avroSchema))   
>                 .updateConsumerProperties(ImmutableMap.of("schema.registry.url",
>                                                                                                                  schemaRegistryUrl));
> Avro schema of the topic has a logicaltype (timestamp-millis). This is deserialized to
> joda-time.
>           {
>             "name": "timeOfRelease",
>             "type": [
>                 "null",
>                 {
>                     "type": "long",
>                     "logicalType": "timestamp-millis",
>                     "connect.version": 1,
>                     "connect.name": "org.apache.kafka.connect.data.Timestamp"
>                 }
>             ],
>             "default": null,
>        }
> Now in my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception
>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
>     at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>     at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>  
> This looks like a bug as joda time type created as part of deserialization is being type casted to Long in below code.
>       else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>               return convertDateTimeStrict((Long) value, fieldType);
>       }
> PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned avro schema and I see that the attribute with timestamp-millis logical type is being converted to joda-time.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)