You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andy Bryant (Jira)" <ji...@apache.org> on 2020/01/09 05:25:00 UTC

[jira] [Commented] (KAFKA-9390) Non-key joining of KTable not compatible with confluent avro serdes

    [ https://issues.apache.org/jira/browse/KAFKA-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011426#comment-17011426 ] 

Andy Bryant commented on KAFKA-9390:
------------------------------------

Here's the code snipped. I'm joining the orders table to the customers table using the customer key as the foreign key.
{code:java}
final KTable<dbserver1.inventory.customers.Key, dbserver1.inventory.customers.Value> customersTable =
    CdcHelpers.valueTable(streams.customers(), "customersTable");
final KTable<dbserver1.inventory.orders.Key, dbserver1.inventory.orders.Value> ordersTable =
    CdcHelpers.valueTable(streams.orders(), "ordersTable");
final KTable<dbserver1.inventory.orders.Key, OrderView> enrichedOrders =
    ordersTable.join(
      customersTable,
      order -> new dbserver1.inventory.customers.Key(order.getPurchaser()),
      (order, customer) ->
          OrderView.newBuilder()
            .setCustomerName(customer.getFirstName() + ' ' + customer.getLastName())
            .setOrderId(order.getOrderNumber())
            .setOrderDate(LocalDate.fromDateFields(Date.valueOf(java.time.LocalDate.ofEpochDay(order.getOrderDate()))))
            .setOrderQuantity(order.getQuantity())
            .setProductName("unknown")
            .setProductWeight(0.0d)
            .build(),
        Named.as("wtf"),
        AvroSerdes.materializedAs("ordersWithCustomerxx"));
{code}

> Non-key joining of KTable not compatible with confluent avro serdes
> -------------------------------------------------------------------
>
>                 Key: KAFKA-9390
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9390
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Andy Bryant
>            Priority: Major
>
> I was trying out the new one-to-many KTable joins against some CDC data in Avro format and kept getting serialisation errors.
>  
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[
> {"name":"order_number","type":"int"}
> ],"connect.name":"dbserver1.inventory.orders.Key"}
>  Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
>   
> {code}
> Both tables have avro keys of different types (one is an order key, the other a customer key).
> This looks like it will cause issues.
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60]
>  They will both attempt to register schemas with the same subject to the schema registry which will fail a backward compatibility check.
> I also noticed in the schema registry there were some subjects that didn't have the application id prefix. This is probably caused by this...
>  [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88]
> Where here {{repartitionTopicName}} doesn't have the application prefix.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)