You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Gordon Wang (Jira)" <ji...@apache.org> on 2020/07/09 07:16:00 UTC

[jira] [Created] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores

Gordon Wang created KAFKA-10252:
-----------------------------------

             Summary: MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores
                 Key: KAFKA-10252
                 URL: https://issues.apache.org/jira/browse/KAFKA-10252
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.4.1
            Reporter: Gordon Wang


When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
{code}
GlobalKTable<GenericRecord, GenericRecord> globalTable = streamsBuilder.globalTable(topic,
                Consumed.with(keySerde, valueSerde),
                Materialized.as(Stores.inMemoryKeyValueStore(topic)));
{code}
I got StreamsException like below:
{code}
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for topic applicationId-sourceTopicName-changelog
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
{code}
But as seen in GlobalKTable Java Doc the changelog stream shall not be created and in fact was not created.This leads to our custom serde to be searching for schema (we are using Confluent Platform and Avro based schema registry for the job) using a wrong topic name (should just be sourceTopicName rather than applicationId-sourceTopicName-changelog).
After digging into the code, I found *initStoreSerde* method in *MeteredTimestampedKeyValueStore* would assume the topic backing the store would always be storeChangelogTopic when initializing the Serdes for the state store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl ProcessorContext) we shall use the original topic name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)