You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/01 21:01:04 UTC

[jira] [Commented] (KAFKA-5164) SetSchemaMetadata does not replace the schemas in structs correctly

    [ https://issues.apache.org/jira/browse/KAFKA-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033687#comment-16033687 ] 

ASF GitHub Bot commented on KAFKA-5164:
---------------------------------------

GitHub user rhauch opened a pull request:

    https://github.com/apache/kafka/pull/3198

    KAFKA-5164 Ensure SetSchemaMetadata updates key or value when Schema changes

    When the `SetSchemaMetadata` SMT is used to change the name and/or version of the key or value’s schema, any references to the old schema in the key or value must be changed to reference the new schema. Only keys or values that are `Struct` have such references, and so currently only these are adjusted.
    
    This is based on `trunk` since the fix is expected to be targeted to the 0.11.1 release.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rhauch/kafka kafka-5164

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3198
    
----
commit 68bf68b3df065437318e2a87a6d1181b1205a806
Author: Randall Hauch <rh...@gmail.com>
Date:   2017-06-01T20:40:01Z

    KAFKA-5164 Ensure SetSchemaMetadata updates key or value when Schema changes
    
    When the `SetSchemaMetadata` SMT is used to change the name and/or version of the key or value’s schema, any references to the old schema in the key or value must be changed to reference the new schema. Only keys or values that are `Struct` have such references, and so currently only these are adjusted.

----


> SetSchemaMetadata does not replace the schemas in structs correctly
> -------------------------------------------------------------------
>
>                 Key: KAFKA-5164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.1
>            Reporter: Ewen Cheslack-Postava
>
> In SetSchemaMetadataTest we verify that the name and version of the schema in the record have been replaced:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62
> However, in the case of Structs, the schema will be attached to both the record and the Struct itself. So we correctly rebuild the Record:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119
> But if the key or value are a struct, they will still contain the old schema embedded in the struct.
> Ultimately this can lead to validations in other code failing (even for very simple changes like adjusting the name of a schema):
> {code}
> (org.apache.kafka.connect.runtime.WorkerTask:141)
> org.apache.kafka.connect.errors.DataException: Mismatching struct schema
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
>     at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The solution to this is probably to check whether we're dealing with a Struct when we use a new schema and potentially copy/reallocate it.
> This particular issue would only appear if we don't modify the data, so I think SetSchemaMetadata is currently the only transformation that would have the issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)