You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Benjamin Bargeton (JIRA)" <ji...@apache.org> on 2017/06/15 11:40:00 UTC

[jira] [Comment Edited] (KAFKA-5398) Joins on GlobalKTable don't work properly when combined with Avro and the Confluent Schema Registry

    [ https://issues.apache.org/jira/browse/KAFKA-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050225#comment-16050225 ] 

Benjamin Bargeton edited comment on KAFKA-5398 at 6/15/17 11:39 AM:
--------------------------------------------------------------------

For sure, but the issue can still happen with the same serdes.
Let's say we have:
* a {{KStream}} backed by a topic ({{stream-topic}}) feeded by application {{A}}
* a {{KTable}} or a {{GlobalKTable}} backed by another topic ({{table-topic}}) feeded by application {{B}}
* {{table-topic}} was written by application {{B}} with an avro serialized key (schema {{v1}})
Now let's say application {{B}} update it's schema to {{v2}} (a new field with a default value for example)
{{table-topic}} now contains some {{v1}} serialized key and some {{v2}} serialized key

In our stream app, starting from the KSTream, we can choose to use either {{v1}} or {{v2}} to produce a key (let's say {{v2}}) (for {{selectKey}} operation prior to the {{join}} if we are using a {{KTable}}, or for the {{join}} directly if we are using a {{GlobalKTable}}).
* In the {{KTable}} case the join will fail because the re-partitioning will not be equivalent as the produced key is always in {{v2}} whereas {{table-topic}} contains both {{v1}} and {{v2}} 
* In the {{GlobalKTable}} case, it will fail too as described in the issue because the byte[] for the {{table-topic}} will be compared directly against the one of the produced key.

Regarding to the Confluent Schema Registry, the problem is amplified because it puts the schema id in the byte representation, so even adding a custom attribute to the schema (like {{avro.java.string}} that wont even be serialized) will change the produced bytes.

How would you manage those cases? 


was (Author: benba):
For sure, but the issue can still happen with the same serdes.
Let's say we have:
* a {{KStream}} backed by a topic ({{stream-topic}}) feeded by application {{A}}
* a {{KTable}} or a {{GlobalKTable}} backed by another topic ({{table-topic}}) feeded by application {{B}}
* {{table-topic}} was written by application {{B}} with an avro serialized key (schema {{v1}})
Now let's say application {{B}} update it's schema to {{v2}} (a new field with a default value for example)
{{table-topic}} now contains some {{v1}} serialized key and some {{v2}} serialized key

In our stream app, starting from the KSTream, we can choose to use either {{v1}} or {{v2}} to produce a key (let's say {{v2}}) (for {{selectKey}} operation prior to the {{join}} if we are using a {{KTable}}, or for the {{join}} directly if we are using a {{GlobalKTable}}).
* In the {{KTable}} case the join will fail because the re-partitioning will not be equivalent as the produced key is always in {{v2}} whereas {{table-topic}} contains both {{v1}} and {{v2}} 
* In the {{GlobalKTable}} case, it will fail too as described in the issue because the byte[] for the {{table-topic}} will be compared directly against the one of the produced key.

Regarding to the Confluent Schema Registry, the problem is amplified because it puts the schema id in the byte representation, so even adding a custom attribute to the schema (like {{avro.java.string}}) that wont even be serialized will change the produced bytes.

How would you manage those cases? 

> Joins on GlobalKTable don't work properly when combined with Avro and the Confluent Schema Registry
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: Kafka, Avro, Confluent Schema Registry (3.2.1)
>            Reporter: Benjamin Bargeton
>
> Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected when using the following setup:
> * Use Kafka in combination with the Confluent Schema Registry
> * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} input by posting some messages with an Avro {{GenericRecord}} as the key (using a traditional {{Producer/ProducerRecord}} for example).
> The dumb avro schema for the exemple:
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": "string"
>     }
>   ]
> }
> {code}
> * Start a kafka stream process that process messages using this time an Avro {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same schema
> {code:java}
> KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic");
> GlobalKTable<AvroKey, AnyObject> globalTable = builder.globalTable("my-global-topic", "my-global-topic-store");
> stream
> 	.leftJoin(globalTable, (k, v) -> new AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/)
> 	.print("Result");
> {code}
> Note that the schema generated by Avro for the {{SpecificRecord}} slightly differs from the original one because we use String instead of CharSequence (Avro config):
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": {
>         "type": "string",
>         "avro.java.string": "String"
>       }
>     }
>   ]
> }
> {code}
> * Last but not least, the Confluent Schema Registry will use byte 1-4 of the Avro serialized object to put the schema id of the schema stored in the schema registry.
> http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
> Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will be initilized, it will use the {{byte[]}} straight from the key.
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164
> Schemas for producer and stream app differs slightly (but are compatible), so they are registred with a different global id.
> Since the id is contained in the binary representation, the lookup will fail during the join.
> I didn't test but the issue is probably broader than just this case: if the we have an upstream producer that is doing a schema evolution (with backwards compatible change), it should lead to the same issue.
> Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works fine, because the key is first deserialized and then reserialized using the current serdes:
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198
> To conclude I'm not sure to fully understand yet how all pieces connect together for state stores, but I assume that for a {{GlobalKTable}} there should also be a derserialization/reserialization for each key before storing them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior coherent).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)