You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Gordon Wang (Jira)" <ji...@apache.org> on 2020/07/09 07:16:00 UTC
[jira] [Created] (KAFKA-10252) MeteredTimestampedKeyValueStore not
setting up correct topic name for GlobalKTable associating StateStores
Gordon Wang created KAFKA-10252:
-----------------------------------
Summary: MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores
Key: KAFKA-10252
URL: https://issues.apache.org/jira/browse/KAFKA-10252
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.4.1
Reporter: Gordon Wang
When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
{code}
GlobalKTable<GenericRecord, GenericRecord> globalTable = streamsBuilder.globalTable(topic,
Consumed.with(keySerde, valueSerde),
Materialized.as(Stores.inMemoryKeyValueStore(topic)));
{code}
I got StreamsException like below:
{code}
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for topic applicationId-sourceTopicName-changelog
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
{code}
But as seen in GlobalKTable Java Doc the changelog stream shall not be created and in fact was not created.This leads to our custom serde to be searching for schema (we are using Confluent Platform and Avro based schema registry for the job) using a wrong topic name (should just be sourceTopicName rather than applicationId-sourceTopicName-changelog).
After digging into the code, I found *initStoreSerde* method in *MeteredTimestampedKeyValueStore* would assume the topic backing the store would always be storeChangelogTopic when initializing the Serdes for the state store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl ProcessorContext) we shall use the original topic name directly here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)