You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Randall Hauch (JIRA)" <ji...@apache.org> on 2017/06/01 21:02:04 UTC

[jira] [Updated] (KAFKA-5164) SetSchemaMetadata does not replace the schemas in structs correctly

     [ https://issues.apache.org/jira/browse/KAFKA-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Randall Hauch updated KAFKA-5164:
---------------------------------
    Status: Patch Available  (was: Open)

> SetSchemaMetadata does not replace the schemas in structs correctly
> -------------------------------------------------------------------
>
>                 Key: KAFKA-5164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.1
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Randall Hauch
>
> In SetSchemaMetadataTest we verify that the name and version of the schema in the record have been replaced:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62
> However, in the case of Structs, the schema will be attached to both the record and the Struct itself. So we correctly rebuild the Record:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119
> But if the key or value are a struct, they will still contain the old schema embedded in the struct.
> Ultimately this can lead to validations in other code failing (even for very simple changes like adjusting the name of a schema):
> {code}
> (org.apache.kafka.connect.runtime.WorkerTask:141)
> org.apache.kafka.connect.errors.DataException: Mismatching struct schema
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
>     at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The solution to this is probably to check whether we're dealing with a Struct when we use a new schema and potentially copy/reallocate it.
> This particular issue would only appear if we don't modify the data, so I think SetSchemaMetadata is currently the only transformation that would have the issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)