You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/10/19 17:16:00 UTC

[jira] [Commented] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()

    [ https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430650#comment-17430650 ] 

Matthias J. Sax commented on KAFKA-13373:
-----------------------------------------

I did not look into it too deeply, but I guess it must be related to our internal wrappers. To unify runtime code, we wrap different user interfaces such that the runtime only works with a reduced surface area and the wrappers take are to translate between the interfaces. My suspicious is that some wrapper is not forwarding a `stores()` call correctly. If that's true, the fix itself should be simple – the tricky part is only to find the right place in the code...

> ValueTransformerWithKeySupplier doesn't work with store()
> ---------------------------------------------------------
>
>                 Key: KAFKA-13373
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13373
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Anatoly Tsyganenko
>            Priority: Minor
>              Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like this:
>  
> {code:java}
> public final class CustomSupplier implements ValueTransformerWithKeySupplier<Windowed<String>, JsonNode, JsonNode> {
> private final String storeName = "my-store";
>     public Set<StoreBuilder<?>> stores() {
>         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
>         final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
>         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>         final Serde<String> stringSerde = Serdes.String();
>         final StoreBuilder<TimestampedKeyValueStore<String, JsonNode>> store = Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
>                         stringSerde, jsonSerde).withLoggingDisabled();
>         return Collections.singleton(store);
>     }
>     @Override
>     public ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode> get() {
>         return new ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode>() {
>             private ProcessorContext context;
>             private TimestampedKeyValueStore<String, JsonNode> store;
>             @Override
>             public void init(final ProcessorContext context) {
>                 this.store = context.getStateStore(storeName);
>                 this.context = context;
>             }
> //....
> }{code}
>  
> But got next error for line "this.store = context.getStateStore(storeName);" in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KTABLE-TRANSFORMVALUES-0000000008 has no access to StateStore my-store as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.{code}
>  
> The same code works perfect with Transform or when I adding store to builder. Looks like something wrong when ConnectedStoreProvider and ValueTransformerWithKeySupplier used together.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)