You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Adam Bellemare (Jira)" <ji...@apache.org> on 2020/03/18 17:15:00 UTC

[jira] [Comment Edited] (KAFKA-9732) Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce

    [ https://issues.apache.org/jira/browse/KAFKA-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061921#comment-17061921 ] 

Adam Bellemare edited comment on KAFKA-9732 at 3/18/20, 5:14 PM:
-----------------------------------------------------------------

[~vvcephei]

Hey John

Not sure if this is a known-issue with Process 1 as described above, an as-designed/feature, or if I have a bug deeper in the joiner code. I don't think it's the latter as Process 2 works just fine. It seems to be around HOW a KTable is created.

Anyways, @ you because I think you have the most context around the serdes and the FKJ, given some of the recent fixes you did in 2.4.1 and the help you gave me in the original KIP.


was (Author: abellemare):
[~vvcephei]

Hey John

Not sure if this is a known-issue with Process 1 as described above, an as-designed/feature, or if I have a bug deeper in the joiner code. I don't think it's the latter as Process 2 works just fine. It seems to be around HOW a KTable is created.

> Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9732
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9732
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0, 2.4.1
>            Reporter: Adam Bellemare
>            Priority: Major
>
> I'm upgrading some internal business code that used to use a prototype version of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running into an issue where the joiner is using the default Serde, despite me clearly specifying NOT to use the default serde (unless I am missing something!). Currently, this is how I generate the left KTable, used in the _*leftTable.join(rightTable, ...)*_ FKJoin.
> Let's call this process 1:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> //For wrapping nulls in mapValues below
> case class OptionalDeletable[T](elem: Option[T])
> //Internal Serdes that do NOT use the SR
> //Same serde logic as externalMyKeySerde, but doesn't register schemas to schema registry.
> val internalMyKeySerde = ... 
> //Same serde logic as externalMyValueSerde, but doesn't register schemas to schema registry.
> val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] = ... 
> val myLeftTable: KTable[MyKey, MyValue] =
>       streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
>         .mapValues(
>           v => {
>             //We need the nulls to propagate deletes.
>             //Wrap this in a simple case-class because we can't groupByKey+reduce null values as they otherwise get filtered out. 
>             OptionalDeletable(Some(v))
>           }
>         )
>         .groupByKey(Grouped.`with`(internalMyKeySerde, internalOptionalDeletableMyValueSerde))
>         .reduce((_,x) => x)(
>             Materialized.as("myLeftTable")(internalMyKeySerde, internalOptionalDeletableMyValueSerde))
>         .mapValues(v => v.elem.get) //Unwrap the element
> {code}
> Next, we create the right table and specify the FKjoining logic
> {code:scala}
> //This is created in an identical way to Process 1... I wont show it here for brevity.
> val rightTable: KTable[RightTableKey, RightTableValue] = streamBuilder.table(...)
> //Not showing previous definitions because I don't think they're relevant to this issue...
> val itemMaterialized =
>     Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, Array[Byte]]]("materializedOutputTable")(
>       internalMyKeySerde, internalJoinedOutputSerde)
> val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, RightTableValue](
>       rightTable, foreignKeyExtractor, joinerFunction, materializedOutputTable)
> //Force evaluation to output some data
> joinedTable.toStream.to("outputStream")
> {code}
> When I execute this with leftTable generated via process 1, I end up somehow losing the leftTable serde along the way and end up falling back onto the default serde. This results in a runtime exception as follows:
> {code:java}
> <removed for brevity>
> Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot be cast to [B
> 	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
> 	at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
> 	at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> 	... 30 more
> {code}
> Now, if I change process 1 to the following:
> Process 2:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> val myLeftTable: KTable[MyKey, MyValue] =
>       streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer)
> //The downside of this approach is that we end up registering a bunch of internal topics to the schema registry (S.R.), significantly increasing the clutter in our lookup UI.
> {code}
> Everything works as expected, and the expected `_*externalMyValueSerde*_` is used to serialize the events (though I don't want this, as it registers to the SR and clutters it up).
> I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm having a hard time figuring out *if this is normal existing behaviour for how a KTable is created via* *Process 1* or if I'm stumbling upon a bug somewhere. When I try to debug my way through this, the FKJoiner appears to use `_*valSerde = null*_` (and therefore fall back to the default Serde) for the KTable created via process 1. This is unexpected to me, I was expected to see `_*valSerde = internalOptionalDeletableMyValueSerde*_` instead.
> Is this a bug, or is this a problem with something that I am doing unwittingly?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)