You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2019/12/06 17:15:22 UTC

How to set concrete names for state stores and internal topics backed by these

Hi,
In my application I have names of internal topics like this:

ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0

Is it possible to set concrete names for these instead of say **
KSTREAM-JOINOTHER-0000000059-store**

This way I can identify at what code in my DSL is responsible for data
inside them.

So far I have set names for:
Grouped.with
Materialized.as
Joined.with

This has helped me get concrete names at many places however still at some
places I see arbitrary names.

Also note that somehow this code works
Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())

But not:
Materialized.as("d-l-i-store").withKeySerde(new
JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())

The error I get is:
Description Resource Path Location Type
The method withKeySerde(Serde<Object>) in the type
Materialized<Object,Object,StateStore> is not applicable for the arguments
(JSONSerde<K>)

I have my class

class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
Deserializer<T>, Serde<T> {
......
}

This is pretty much same as from kafka streams typed example.

Thanks
Sachin

Re: How to set concrete names for state stores and internal topics backed by these

Posted by John Roesler <vv...@apache.org>.
Hi Sachin,

I’m glad it helped!

What you have in mind is a good thing to do.

One thing to watch out for is _not_ to add names using Materialized for KTable operations that otherwise would not create a store. For example, if you filter or mapValues a KTable, those operations usually do not actually require storing any state. But if you add a name with Materialized, you’re telling Streams to actually create a state store and materialize the table. What I would do is write the topology without names first, then use topology.describe() to figure out which actual stores are needed, and then name them. This is an area I have some plans to improve. 

To answer your question, if you didn’t need serdes before, you don’t need them when you just use Materialized.as(name). Streams should continue to pass down the serdes through the program where it can. 

Hope this answers your question. 
-John

On Sat, Dec 7, 2019, at 03:20, Sachin Mittal wrote:
> Hi John,
> This was very helpful. However I am still confused about when to set the
> names for Materialized and Grouped.
> I am basically setting the names because to have definite names of state
> stores and internal topics identifiable for debugging purpose.
> 
> So when we set a name, do we also need to set serde for key/value type?
> If not then what defaults are used by them?
> 
> I'll just explain by quick example:
> My original code was:
> table = stream.map((k, v) -> ...).groupByKey().reduce((av, nv) -> nv)
> 
> In order to set some names to the intermediate stores/topics I changed the
> code as:
> table = stream.map((k, v) -> ...).groupByKey(Grouped.with("group",
> Serde<K>, Serde<V>)).reduce((av, nv) -> nv, Materialized.as("store"))
> 
> So I wanted to know once I create a named Materialzed do I need to set its
> key/value serde too?
> so is this the better code
> table = stream
>   .map((k, v) -> ...)
>   .groupByKey(Grouped.with("group", Serde<K>, Serde<V>))
>   .reduce((av, nv) -> nv, Materialized.<K, V, KeyValueStore<Bytes,
> byte[]>>as("store-name").withKeySerde(Serde<K>).withValueSerde(Serde<V>)))
> 
> Note that I have custom class for Key and Value.
> 
> Thanks
> Sachin
> 
> 
> 
> On Fri, Dec 6, 2019 at 11:02 PM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Sachin,
> >
> > The way that Java infers generic arguments makes that case particularly
> > obnoxious.
> >
> > By the way, the problem you're facing is specifically addressed by these
> > relatively new features:
> > *
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > *
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
> >
> > Since this behavior has been under development recently, I thought you
> > might benefit from the context.
> >
> > To answer your question, what you have to do is explicitly mention the
> > type arguments to "Materialized.as(name)" when you're using the
> > withKeySerde, etc.
> >
> > It will look something like this:
> >
> > Materialized
> >   .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
> >   .withKeySerde(new Serde<KeyType>...)
> >   .withValueSerde(new Serde<ValueType>...));
> >
> > I can explain exactly why this is necessary if you want, but the short
> > answer is that the Java type system only makes a rudimentary effort to
> > infer types.
> >
> > FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> > find a way to fix it, if we ever change the Materialized builder interface.
> >
> > Hope this helps,
> > -John
> >
> > On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > > Hi,
> > > In my application I have names of internal topics like this:
> > >
> > > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> > > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> > > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> > > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> > >
> > > Is it possible to set concrete names for these instead of say **
> > > KSTREAM-JOINOTHER-0000000059-store**
> > >
> > > This way I can identify at what code in my DSL is responsible for data
> > > inside them.
> > >
> > > So far I have set names for:
> > > Grouped.with
> > > Materialized.as
> > > Joined.with
> > >
> > > This has helped me get concrete names at many places however still at
> > some
> > > places I see arbitrary names.
> > >
> > > Also note that somehow this code works
> > > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> > >
> > > But not:
> > > Materialized.as("d-l-i-store").withKeySerde(new
> > > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> > >
> > > The error I get is:
> > > Description Resource Path Location Type
> > > The method withKeySerde(Serde<Object>) in the type
> > > Materialized<Object,Object,StateStore> is not applicable for the
> > arguments
> > > (JSONSerde<K>)
> > >
> > > I have my class
> > >
> > > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> > > Deserializer<T>, Serde<T> {
> > > ......
> > > }
> > >
> > > This is pretty much same as from kafka streams typed example.
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Re: How to set concrete names for state stores and internal topics backed by these

Posted by Sachin Mittal <sj...@gmail.com>.
Hi John,
This was very helpful. However I am still confused about when to set the
names for Materialized and Grouped.
I am basically setting the names because to have definite names of state
stores and internal topics identifiable for debugging purpose.

So when we set a name, do we also need to set serde for key/value type?
If not then what defaults are used by them?

I'll just explain by quick example:
My original code was:
table = stream.map((k, v) -> ...).groupByKey().reduce((av, nv) -> nv)

In order to set some names to the intermediate stores/topics I changed the
code as:
table = stream.map((k, v) -> ...).groupByKey(Grouped.with("group",
Serde<K>, Serde<V>)).reduce((av, nv) -> nv, Materialized.as("store"))

So I wanted to know once I create a named Materialzed do I need to set its
key/value serde too?
so is this the better code
table = stream
  .map((k, v) -> ...)
  .groupByKey(Grouped.with("group", Serde<K>, Serde<V>))
  .reduce((av, nv) -> nv, Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>as("store-name").withKeySerde(Serde<K>).withValueSerde(Serde<V>)))

Note that I have custom class for Key and Value.

Thanks
Sachin



On Fri, Dec 6, 2019 at 11:02 PM John Roesler <vv...@apache.org> wrote:

> Hi Sachin,
>
> The way that Java infers generic arguments makes that case particularly
> obnoxious.
>
> By the way, the problem you're facing is specifically addressed by these
> relatively new features:
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
>
> Since this behavior has been under development recently, I thought you
> might benefit from the context.
>
> To answer your question, what you have to do is explicitly mention the
> type arguments to "Materialized.as(name)" when you're using the
> withKeySerde, etc.
>
> It will look something like this:
>
> Materialized
>   .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
>   .withKeySerde(new Serde<KeyType>...)
>   .withValueSerde(new Serde<ValueType>...));
>
> I can explain exactly why this is necessary if you want, but the short
> answer is that the Java type system only makes a rudimentary effort to
> infer types.
>
> FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> find a way to fix it, if we ever change the Materialized builder interface.
>
> Hope this helps,
> -John
>
> On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > Hi,
> > In my application I have names of internal topics like this:
> >
> > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> >
> > Is it possible to set concrete names for these instead of say **
> > KSTREAM-JOINOTHER-0000000059-store**
> >
> > This way I can identify at what code in my DSL is responsible for data
> > inside them.
> >
> > So far I have set names for:
> > Grouped.with
> > Materialized.as
> > Joined.with
> >
> > This has helped me get concrete names at many places however still at
> some
> > places I see arbitrary names.
> >
> > Also note that somehow this code works
> > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> >
> > But not:
> > Materialized.as("d-l-i-store").withKeySerde(new
> > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> >
> > The error I get is:
> > Description Resource Path Location Type
> > The method withKeySerde(Serde<Object>) in the type
> > Materialized<Object,Object,StateStore> is not applicable for the
> arguments
> > (JSONSerde<K>)
> >
> > I have my class
> >
> > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> > Deserializer<T>, Serde<T> {
> > ......
> > }
> >
> > This is pretty much same as from kafka streams typed example.
> >
> > Thanks
> > Sachin
> >
>

Re: How to set concrete names for state stores and internal topics backed by these

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi Sachin

We are using a small helper method to keep this readable:

private <K, V, S extends StateStore> Materialized<K, V, S>
materializedWith(String name, Serde<K> keySerde, Serde<V> valueSerde)
{
    Materialized<K, V, S> materialized = Materialized.as(name);
    return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}

So the Materialized.as just becomes a

materializedWith("storename", keySerde, valueSerde)

Hope that helps

Patrik


On Fri, 6 Dec 2019 at 18:32, John Roesler <vv...@apache.org> wrote:

> Hi Sachin,
>
> The way that Java infers generic arguments makes that case particularly
> obnoxious.
>
> By the way, the problem you're facing is specifically addressed by these
> relatively new features:
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
>
> Since this behavior has been under development recently, I thought you
> might benefit from the context.
>
> To answer your question, what you have to do is explicitly mention the
> type arguments to "Materialized.as(name)" when you're using the
> withKeySerde, etc.
>
> It will look something like this:
>
> Materialized
>   .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
>   .withKeySerde(new Serde<KeyType>...)
>   .withValueSerde(new Serde<ValueType>...));
>
> I can explain exactly why this is necessary if you want, but the short
> answer is that the Java type system only makes a rudimentary effort to
> infer types.
>
> FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> find a way to fix it, if we ever change the Materialized builder interface.
>
> Hope this helps,
> -John
>
> On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > Hi,
> > In my application I have names of internal topics like this:
> >
> > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> >
> > Is it possible to set concrete names for these instead of say **
> > KSTREAM-JOINOTHER-0000000059-store**
> >
> > This way I can identify at what code in my DSL is responsible for data
> > inside them.
> >
> > So far I have set names for:
> > Grouped.with
> > Materialized.as
> > Joined.with
> >
> > This has helped me get concrete names at many places however still at
> some
> > places I see arbitrary names.
> >
> > Also note that somehow this code works
> > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> >
> > But not:
> > Materialized.as("d-l-i-store").withKeySerde(new
> > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> >
> > The error I get is:
> > Description Resource Path Location Type
> > The method withKeySerde(Serde<Object>) in the type
> > Materialized<Object,Object,StateStore> is not applicable for the
> arguments
> > (JSONSerde<K>)
> >
> > I have my class
> >
> > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> > Deserializer<T>, Serde<T> {
> > ......
> > }
> >
> > This is pretty much same as from kafka streams typed example.
> >
> > Thanks
> > Sachin
> >
>

Re: How to set concrete names for state stores and internal topics backed by these

Posted by John Roesler <vv...@apache.org>.
Hi Sachin,

The way that Java infers generic arguments makes that case particularly obnoxious.

By the way, the problem you're facing is specifically addressed by these relatively new features:
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping

Since this behavior has been under development recently, I thought you might benefit from the context.

To answer your question, what you have to do is explicitly mention the type arguments to "Materialized.as(name)" when you're using the withKeySerde, etc. 

It will look something like this:

Materialized
  .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
  .withKeySerde(new Serde<KeyType>...)
  .withValueSerde(new Serde<ValueType>...));

I can explain exactly why this is necessary if you want, but the short answer is that the Java type system only makes a rudimentary effort to infer types.

FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can find a way to fix it, if we ever change the Materialized builder interface.

Hope this helps,
-John

On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> Hi,
> In my application I have names of internal topics like this:
> 
> ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> 
> Is it possible to set concrete names for these instead of say **
> KSTREAM-JOINOTHER-0000000059-store**
> 
> This way I can identify at what code in my DSL is responsible for data
> inside them.
> 
> So far I have set names for:
> Grouped.with
> Materialized.as
> Joined.with
> 
> This has helped me get concrete names at many places however still at some
> places I see arbitrary names.
> 
> Also note that somehow this code works
> Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> 
> But not:
> Materialized.as("d-l-i-store").withKeySerde(new
> JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> 
> The error I get is:
> Description Resource Path Location Type
> The method withKeySerde(Serde<Object>) in the type
> Materialized<Object,Object,StateStore> is not applicable for the arguments
> (JSONSerde<K>)
> 
> I have my class
> 
> class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> Deserializer<T>, Serde<T> {
> ......
> }
> 
> This is pretty much same as from kafka streams typed example.
> 
> Thanks
> Sachin
>