You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Adam Bellemare (Jira)" <ji...@apache.org> on 2020/03/24 01:15:00 UTC

[jira] [Resolved] (KAFKA-9732) Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce

     [ https://issues.apache.org/jira/browse/KAFKA-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Adam Bellemare resolved KAFKA-9732.
-----------------------------------
    Resolution: Not A Problem

Issue was with reporter's usage of the API.

> Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9732
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9732
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1
>            Reporter: Adam Bellemare
>            Priority: Major
>
> I'm upgrading some internal business code that used to use a prototype version of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running into an issue where the joiner is using the default Serde, despite me clearly specifying NOT to use the default serde (unless I am missing something!). Currently, this is how I generate the left KTable, used in the _*leftTable.join(rightTable, ...)*_ FKJoin.
> Let's call this process 1:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> //For wrapping nulls in mapValues below
> case class OptionalDeletable[T](elem: Option[T])
> //Internal Serdes that do NOT use the SR
> //Same serde logic as externalMyKeySerde, but doesn't register schemas to schema registry.
> val internalMyKeySerde = ... 
> //Same serde logic as externalMyValueSerde, but doesn't register schemas to schema registry.
> val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] = ... 
> val myLeftTable: KTable[MyKey, MyValue] =
>       streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
>         .mapValues(
>           v => {
>             //We need the nulls to propagate deletes.
>             //Wrap this in a simple case-class because we can't groupByKey+reduce null values as they otherwise get filtered out. 
>             OptionalDeletable(Some(v))
>           }
>         )
>         .groupByKey(Grouped.`with`(internalMyKeySerde, internalOptionalDeletableMyValueSerde))
>         .reduce((_,x) => x)(
>             Materialized.as("myLeftTable")(internalMyKeySerde, internalOptionalDeletableMyValueSerde))
>         .mapValues(v => v.elem.get) //Unwrap the element
> {code}
> Next, we create the right table and specify the FKjoining logic
> {code:scala}
> //This is created in an identical way to Process 1... I wont show it here for brevity.
> val rightTable: KTable[RightTableKey, RightTableValue] = streamBuilder.table(...)
> //Not showing previous definitions because I don't think they're relevant to this issue...
> val itemMaterialized =
>     Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, Array[Byte]]]("materializedOutputTable")(
>       internalMyKeySerde, internalJoinedOutputSerde)
> val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, RightTableValue](
>       rightTable, foreignKeyExtractor, joinerFunction, materializedOutputTable)
> //Force evaluation to output some data
> joinedTable.toStream.to("outputStream")
> {code}
> When I execute this with leftTable generated via process 1, I end up somehow losing the leftTable serde along the way and end up falling back onto the default serde. This results in a runtime exception as follows:
> {code:java}
> <removed for brevity>
> Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot be cast to [B
> 	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
> 	at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
> 	at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> 	... 30 more
> {code}
> Now, if I change process 1 to the following:
> Process 2:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> val myLeftTable: KTable[MyKey, MyValue] =
>       streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer)
> //The downside of this approach is that we end up registering a bunch of internal topics to the schema registry (S.R.), significantly increasing the clutter in our lookup UI.
> {code}
> Everything works as expected, and the expected `_*externalMyValueSerde*_` is used to serialize the events (though I don't want this, as it registers to the SR and clutters it up).
> I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm having a hard time figuring out *if this is normal existing behaviour for how a KTable is created via* *Process 1* or if I'm stumbling upon a bug somewhere. When I try to debug my way through this, the FKJoiner appears to use `_*valSerde = null*_` (and therefore fall back to the default Serde) for the KTable created via process 1. This is unexpected to me, I was expected to see `_*valSerde = internalOptionalDeletableMyValueSerde*_` instead.
> Is this a bug, or is this a problem with something that I am doing unwittingly?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)