You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/05/12 09:35:16 UTC

data structures used by GlobalKTable, KTable

Hello confluent team,

Could you provide some information on what data structures are used
internally by GlobalKTable and KTables. The application that I am working
on has a requirement to read cached data from GlobalKTable on every
incoming event, so the reads from GlobalKTable need to be efficient.

Re: data structures used by GlobalKTable, KTable

Posted by "Matthias J. Sax" <mj...@apache.org>.
Your request is out-of-scope for the KIP. Sorry.

Kafka Streams ships with an in-memory LRU-store though -- however, this
store would never go back to the topic. It just caches bases on LRU and
if data is evicted it's gone.

Going back to the topic would not be feasible anyway, because there is
no index on the key, and thus to reload a specific key from the topic
would require a full scan of the whole topic.

Thus, if you have data that does not fit into main-memory, you should
use the default RocksDB store.


-Matthias



On 5/16/20 10:10 PM, Pushkar Deole wrote:
> If my thinking is correct then for some scenarios or use cases, the MRU for
> GlobalKTable might also work as a local store of data since the MRU will
> always store the data required by that application instance.
> 
> On Sun, May 17, 2020 at 9:42 AM Pushkar Deole <pd...@gmail.com> wrote:
> 
>> Matthias,
>>
>> I would like to provide a suggestion here. Please check if this can be
>> converted into a KIP. Since GlobalKTable holds complete topic data, and
>> when the store underneath is in-memory store then the data in memory can
>> quickly grow to a large value. I think it would be good if while using
>> GlobalKTable with in-memory store, the memory limit (or no. of events) can
>> also be specified in which case the GlobalKTable will hold only that much
>> data in memory and rest of the data will be fetched from topic.
>> On top of it, the GlobalKTable can also be converted into most recently
>> used cache so whatever memory size is allocated to the table, it will
>> always hold the MRU on that cache.
>>
>> On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Yeah, the current API doesn't make it very clear how to do it. You can
>>> set an in-memory like this:
>>>
>>>> builder.globalTable("topic",
>>> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>>>
>>>
>>> We are already working on an improved API via KIP-591:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/13/20 3:40 AM, Pushkar Deole wrote:
>>>> Matthias,
>>>>
>>>> For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
>>> and I
>>>> don't see any option to mention in-memory store there: all these API
>>>> documentation states that  The resulting GlobalKTable
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
>>>>
>>>> will
>>>> be materialized in a local KeyValueStore
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>>>>
>>>> with
>>>> an internal store name . It doesn't give an option whether in-memory or
>>>> backed by DB
>>>>
>>>> globalTable
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
>>>>
>>>> (String
>>>> <
>>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>>>>
>>>>  topic)
>>>> globalTable
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
>>>>
>>>> (String
>>>> <
>>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>>>>
>>>>  topic, Consumed
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
>>>>
>>>> <K,V> consumed, Materialized
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
>>>>
>>>> <K,V,KeyValueStore
>>>> <
>>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>>>>
>>>> <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
>>>>
>>>> On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>>
>>>>> By default, RocksDB is used. You can also change it to use an in-memory
>>>>> store that is basically a HashMap.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 5/12/20 10:16 AM, Pushkar Deole wrote:
>>>>>> Thanks Liam!
>>>>>>
>>>>>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
>>>>>> liam.clarke@adscale.co.nz> wrote:
>>>>>>
>>>>>>> Hi Pushkar,
>>>>>>>
>>>>>>> GlobalKTables and KTables can have whatever data structure you like,
>>> if
>>>>> you
>>>>>>> provide the appropriate deserializers - for example, an Kafka Streams
>>>>> app I
>>>>>>> maintain stores model data (exported to a topic per entity from
>>> Postgres
>>>>>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
>>>>> ObjectNode's
>>>>>>> keyed by entity id
>>>>>>>
>>>>>>> If you're worried about efficiency, just treat KTables/GlobalKTables
>>> as
>>>>> a
>>>>>>> HashMap<K, V> to and you're pretty much there. In terms of
>>> efficiency,
>>>>>>> we're joining model  data to about 7 - 10 TB of transactional data a
>>>>> day,
>>>>>>> and on average, run about 5 - 10 instances of our enrichment app with
>>>>> about
>>>>>>> 2GB max heap.
>>>>>>>
>>>>>>> Kind regards,
>>>>>>>
>>>>>>> Liam "Not a part of the Confluent team, but happy to help"
>>>>>>> Clarke-Hutchinson
>>>>>>>
>>>>>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello confluent team,
>>>>>>>>
>>>>>>>> Could you provide some information on what data structures are used
>>>>>>>> internally by GlobalKTable and KTables. The application that I am
>>>>> working
>>>>>>>> on has a requirement to read cached data from GlobalKTable on every
>>>>>>>> incoming event, so the reads from GlobalKTable need to be efficient.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
> 


Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
If my thinking is correct then for some scenarios or use cases, the MRU for
GlobalKTable might also work as a local store of data since the MRU will
always store the data required by that application instance.

On Sun, May 17, 2020 at 9:42 AM Pushkar Deole <pd...@gmail.com> wrote:

> Matthias,
>
> I would like to provide a suggestion here. Please check if this can be
> converted into a KIP. Since GlobalKTable holds complete topic data, and
> when the store underneath is in-memory store then the data in memory can
> quickly grow to a large value. I think it would be good if while using
> GlobalKTable with in-memory store, the memory limit (or no. of events) can
> also be specified in which case the GlobalKTable will hold only that much
> data in memory and rest of the data will be fetched from topic.
> On top of it, the GlobalKTable can also be converted into most recently
> used cache so whatever memory size is allocated to the table, it will
> always hold the MRU on that cache.
>
> On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Yeah, the current API doesn't make it very clear how to do it. You can
>> set an in-memory like this:
>>
>> > builder.globalTable("topic",
>> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>>
>>
>> We are already working on an improved API via KIP-591:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>>
>>
>>
>> -Matthias
>>
>>
>> On 5/13/20 3:40 AM, Pushkar Deole wrote:
>> > Matthias,
>> >
>> > For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
>> and I
>> > don't see any option to mention in-memory store there: all these API
>> > documentation states that  The resulting GlobalKTable
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
>> >
>> > will
>> > be materialized in a local KeyValueStore
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>> >
>> > with
>> > an internal store name . It doesn't give an option whether in-memory or
>> > backed by DB
>> >
>> > globalTable
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
>> >
>> > (String
>> > <
>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>> >
>> >  topic)
>> > globalTable
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
>> >
>> > (String
>> > <
>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>> >
>> >  topic, Consumed
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
>> >
>> > <K,V> consumed, Materialized
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
>> >
>> > <K,V,KeyValueStore
>> > <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>> >
>> > <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
>> >
>> > On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> By default, RocksDB is used. You can also change it to use an in-memory
>> >> store that is basically a HashMap.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 5/12/20 10:16 AM, Pushkar Deole wrote:
>> >>> Thanks Liam!
>> >>>
>> >>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
>> >>> liam.clarke@adscale.co.nz> wrote:
>> >>>
>> >>>> Hi Pushkar,
>> >>>>
>> >>>> GlobalKTables and KTables can have whatever data structure you like,
>> if
>> >> you
>> >>>> provide the appropriate deserializers - for example, an Kafka Streams
>> >> app I
>> >>>> maintain stores model data (exported to a topic per entity from
>> Postgres
>> >>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
>> >> ObjectNode's
>> >>>> keyed by entity id
>> >>>>
>> >>>> If you're worried about efficiency, just treat KTables/GlobalKTables
>> as
>> >> a
>> >>>> HashMap<K, V> to and you're pretty much there. In terms of
>> efficiency,
>> >>>> we're joining model  data to about 7 - 10 TB of transactional data a
>> >> day,
>> >>>> and on average, run about 5 - 10 instances of our enrichment app with
>> >> about
>> >>>> 2GB max heap.
>> >>>>
>> >>>> Kind regards,
>> >>>>
>> >>>> Liam "Not a part of the Confluent team, but happy to help"
>> >>>> Clarke-Hutchinson
>> >>>>
>> >>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>>> Hello confluent team,
>> >>>>>
>> >>>>> Could you provide some information on what data structures are used
>> >>>>> internally by GlobalKTable and KTables. The application that I am
>> >> working
>> >>>>> on has a requirement to read cached data from GlobalKTable on every
>> >>>>> incoming event, so the reads from GlobalKTable need to be efficient.
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

I would like to provide a suggestion here. Please check if this can be
converted into a KIP. Since GlobalKTable holds complete topic data, and
when the store underneath is in-memory store then the data in memory can
quickly grow to a large value. I think it would be good if while using
GlobalKTable with in-memory store, the memory limit (or no. of events) can
also be specified in which case the GlobalKTable will hold only that much
data in memory and rest of the data will be fetched from topic.
On top of it, the GlobalKTable can also be converted into most recently
used cache so whatever memory size is allocated to the table, it will
always hold the MRU on that cache.

On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yeah, the current API doesn't make it very clear how to do it. You can
> set an in-memory like this:
>
> > builder.globalTable("topic",
> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>
>
> We are already working on an improved API via KIP-591:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>
>
>
> -Matthias
>
>
> On 5/13/20 3:40 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
> and I
> > don't see any option to mention in-memory store there: all these API
> > documentation states that  The resulting GlobalKTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
> >
> > will
> > be materialized in a local KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > with
> > an internal store name . It doesn't give an option whether in-memory or
> > backed by DB
> >
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic)
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic, Consumed
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
> >
> > <K,V> consumed, Materialized
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
> >
> > <K,V,KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
> >
> > On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> By default, RocksDB is used. You can also change it to use an in-memory
> >> store that is basically a HashMap.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/12/20 10:16 AM, Pushkar Deole wrote:
> >>> Thanks Liam!
> >>>
> >>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
> >>> liam.clarke@adscale.co.nz> wrote:
> >>>
> >>>> Hi Pushkar,
> >>>>
> >>>> GlobalKTables and KTables can have whatever data structure you like,
> if
> >> you
> >>>> provide the appropriate deserializers - for example, an Kafka Streams
> >> app I
> >>>> maintain stores model data (exported to a topic per entity from
> Postgres
> >>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
> >> ObjectNode's
> >>>> keyed by entity id
> >>>>
> >>>> If you're worried about efficiency, just treat KTables/GlobalKTables
> as
> >> a
> >>>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
> >>>> we're joining model  data to about 7 - 10 TB of transactional data a
> >> day,
> >>>> and on average, run about 5 - 10 instances of our enrichment app with
> >> about
> >>>> 2GB max heap.
> >>>>
> >>>> Kind regards,
> >>>>
> >>>> Liam "Not a part of the Confluent team, but happy to help"
> >>>> Clarke-Hutchinson
> >>>>
> >>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hello confluent team,
> >>>>>
> >>>>> Could you provide some information on what data structures are used
> >>>>> internally by GlobalKTable and KTables. The application that I am
> >> working
> >>>>> on has a requirement to read cached data from GlobalKTable on every
> >>>>> incoming event, so the reads from GlobalKTable need to be efficient.
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: data structures used by GlobalKTable, KTable

Posted by "Matthias J. Sax" <mj...@apache.org>.
By default, RocksDB is used. Ie, the following are the same:

builder.globalTable("topic");

builder.globalTable(
  "topic",
  Materialized.as(Stores.persistentKeyValueStore("store-name"))
);


-Matthias

On 5/22/20 4:31 AM, Pushkar Deole wrote:
> Matthias,
> 
> I can see that when the Materialized parameter is provided with store-name,
> the store can be accessed through kafkaStreams.store(store-name). This
> returns ReadOnlyKeyValueStore which seems to hold the Map structure
> internally. So, the GlobalKTable API is just an abstraction and if I
> haven't provided Materialized parameter while constructing GlobalKTable
> then which store does it use?
> 
> On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Yeah, the current API doesn't make it very clear how to do it. You can
>> set an in-memory like this:
>>
>>> builder.globalTable("topic",
>> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>>
>>
>> We are already working on an improved API via KIP-591:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>>
>>
>>
>> -Matthias
>>
>>
>> On 5/13/20 3:40 AM, Pushkar Deole wrote:
>>> Matthias,
>>>
>>> For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
>> and I
>>> don't see any option to mention in-memory store there: all these API
>>> documentation states that  The resulting GlobalKTable
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
>>>
>>> will
>>> be materialized in a local KeyValueStore
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>>>
>>> with
>>> an internal store name . It doesn't give an option whether in-memory or
>>> backed by DB
>>>
>>> globalTable
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
>>>
>>> (String
>>> <
>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>>>
>>>  topic)
>>> globalTable
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
>>>
>>> (String
>>> <
>> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
>>>
>>>  topic, Consumed
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
>>>
>>> <K,V> consumed, Materialized
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
>>>
>>> <K,V,KeyValueStore
>>> <
>> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
>>>
>>> <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
>>>
>>> On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> By default, RocksDB is used. You can also change it to use an in-memory
>>>> store that is basically a HashMap.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/12/20 10:16 AM, Pushkar Deole wrote:
>>>>> Thanks Liam!
>>>>>
>>>>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
>>>>> liam.clarke@adscale.co.nz> wrote:
>>>>>
>>>>>> Hi Pushkar,
>>>>>>
>>>>>> GlobalKTables and KTables can have whatever data structure you like,
>> if
>>>> you
>>>>>> provide the appropriate deserializers - for example, an Kafka Streams
>>>> app I
>>>>>> maintain stores model data (exported to a topic per entity from
>> Postgres
>>>>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
>>>> ObjectNode's
>>>>>> keyed by entity id
>>>>>>
>>>>>> If you're worried about efficiency, just treat KTables/GlobalKTables
>> as
>>>> a
>>>>>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
>>>>>> we're joining model  data to about 7 - 10 TB of transactional data a
>>>> day,
>>>>>> and on average, run about 5 - 10 instances of our enrichment app with
>>>> about
>>>>>> 2GB max heap.
>>>>>>
>>>>>> Kind regards,
>>>>>>
>>>>>> Liam "Not a part of the Confluent team, but happy to help"
>>>>>> Clarke-Hutchinson
>>>>>>
>>>>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello confluent team,
>>>>>>>
>>>>>>> Could you provide some information on what data structures are used
>>>>>>> internally by GlobalKTable and KTables. The application that I am
>>>> working
>>>>>>> on has a requirement to read cached data from GlobalKTable on every
>>>>>>> incoming event, so the reads from GlobalKTable need to be efficient.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

I can see that when the Materialized parameter is provided with store-name,
the store can be accessed through kafkaStreams.store(store-name). This
returns ReadOnlyKeyValueStore which seems to hold the Map structure
internally. So, the GlobalKTable API is just an abstraction and if I
haven't provided Materialized parameter while constructing GlobalKTable
then which store does it use?

On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yeah, the current API doesn't make it very clear how to do it. You can
> set an in-memory like this:
>
> > builder.globalTable("topic",
> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>
>
> We are already working on an improved API via KIP-591:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>
>
>
> -Matthias
>
>
> On 5/13/20 3:40 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
> and I
> > don't see any option to mention in-memory store there: all these API
> > documentation states that  The resulting GlobalKTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
> >
> > will
> > be materialized in a local KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > with
> > an internal store name . It doesn't give an option whether in-memory or
> > backed by DB
> >
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic)
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic, Consumed
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
> >
> > <K,V> consumed, Materialized
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
> >
> > <K,V,KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
> >
> > On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> By default, RocksDB is used. You can also change it to use an in-memory
> >> store that is basically a HashMap.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/12/20 10:16 AM, Pushkar Deole wrote:
> >>> Thanks Liam!
> >>>
> >>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
> >>> liam.clarke@adscale.co.nz> wrote:
> >>>
> >>>> Hi Pushkar,
> >>>>
> >>>> GlobalKTables and KTables can have whatever data structure you like,
> if
> >> you
> >>>> provide the appropriate deserializers - for example, an Kafka Streams
> >> app I
> >>>> maintain stores model data (exported to a topic per entity from
> Postgres
> >>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
> >> ObjectNode's
> >>>> keyed by entity id
> >>>>
> >>>> If you're worried about efficiency, just treat KTables/GlobalKTables
> as
> >> a
> >>>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
> >>>> we're joining model  data to about 7 - 10 TB of transactional data a
> >> day,
> >>>> and on average, run about 5 - 10 instances of our enrichment app with
> >> about
> >>>> 2GB max heap.
> >>>>
> >>>> Kind regards,
> >>>>
> >>>> Liam "Not a part of the Confluent team, but happy to help"
> >>>> Clarke-Hutchinson
> >>>>
> >>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hello confluent team,
> >>>>>
> >>>>> Could you provide some information on what data structures are used
> >>>>> internally by GlobalKTable and KTables. The application that I am
> >> working
> >>>>> on has a requirement to read cached data from GlobalKTable on every
> >>>>> incoming event, so the reads from GlobalKTable need to be efficient.
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
thanks.. yes that would help

On Thu, May 14, 2020 at 11:49 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yeah, the current API doesn't make it very clear how to do it. You can
> set an in-memory like this:
>
> > builder.globalTable("topic",
> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));
>
>
> We are already working on an improved API via KIP-591:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>
>
>
> -Matthias
>
>
> On 5/13/20 3:40 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > For GlobalKTable, I am looking at the APIs provided by StreamsBuilder
> and I
> > don't see any option to mention in-memory store there: all these API
> > documentation states that  The resulting GlobalKTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html
> >
> > will
> > be materialized in a local KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > with
> > an internal store name . It doesn't give an option whether in-memory or
> > backed by DB
> >
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic)
> > globalTable
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-
> >
> > (String
> > <
> https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true
> >
> >  topic, Consumed
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html
> >
> > <K,V> consumed, Materialized
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html
> >
> > <K,V,KeyValueStore
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
> >
> > <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
> >
> > On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> By default, RocksDB is used. You can also change it to use an in-memory
> >> store that is basically a HashMap.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/12/20 10:16 AM, Pushkar Deole wrote:
> >>> Thanks Liam!
> >>>
> >>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
> >>> liam.clarke@adscale.co.nz> wrote:
> >>>
> >>>> Hi Pushkar,
> >>>>
> >>>> GlobalKTables and KTables can have whatever data structure you like,
> if
> >> you
> >>>> provide the appropriate deserializers - for example, an Kafka Streams
> >> app I
> >>>> maintain stores model data (exported to a topic per entity from
> Postgres
> >>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
> >> ObjectNode's
> >>>> keyed by entity id
> >>>>
> >>>> If you're worried about efficiency, just treat KTables/GlobalKTables
> as
> >> a
> >>>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
> >>>> we're joining model  data to about 7 - 10 TB of transactional data a
> >> day,
> >>>> and on average, run about 5 - 10 instances of our enrichment app with
> >> about
> >>>> 2GB max heap.
> >>>>
> >>>> Kind regards,
> >>>>
> >>>> Liam "Not a part of the Confluent team, but happy to help"
> >>>> Clarke-Hutchinson
> >>>>
> >>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hello confluent team,
> >>>>>
> >>>>> Could you provide some information on what data structures are used
> >>>>> internally by GlobalKTable and KTables. The application that I am
> >> working
> >>>>> on has a requirement to read cached data from GlobalKTable on every
> >>>>> incoming event, so the reads from GlobalKTable need to be efficient.
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: data structures used by GlobalKTable, KTable

Posted by "Matthias J. Sax" <mj...@apache.org>.
Yeah, the current API doesn't make it very clear how to do it. You can
set an in-memory like this:

> builder.globalTable("topic", Materialized.as(Stores.inMemoryKeyValueStore("store-name")));


We are already working on an improved API via KIP-591:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type



-Matthias


On 5/13/20 3:40 AM, Pushkar Deole wrote:
> Matthias,
> 
> For GlobalKTable, I am looking at the APIs provided by StreamsBuilder and I
> don't see any option to mention in-memory store there: all these API
> documentation states that  The resulting GlobalKTable
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html>
> will
> be materialized in a local KeyValueStore
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
> with
> an internal store name . It doesn't give an option whether in-memory or
> backed by DB
> 
> globalTable
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String->
> (String
> <https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
>  topic)
> globalTable
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized->
> (String
> <https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
>  topic, Consumed
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html>
> <K,V> consumed, Materialized
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html>
> <K,V,KeyValueStore
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
> <org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
> 
> On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> By default, RocksDB is used. You can also change it to use an in-memory
>> store that is basically a HashMap.
>>
>>
>> -Matthias
>>
>> On 5/12/20 10:16 AM, Pushkar Deole wrote:
>>> Thanks Liam!
>>>
>>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
>>> liam.clarke@adscale.co.nz> wrote:
>>>
>>>> Hi Pushkar,
>>>>
>>>> GlobalKTables and KTables can have whatever data structure you like, if
>> you
>>>> provide the appropriate deserializers - for example, an Kafka Streams
>> app I
>>>> maintain stores model data (exported to a topic per entity from Postgres
>>>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
>> ObjectNode's
>>>> keyed by entity id
>>>>
>>>> If you're worried about efficiency, just treat KTables/GlobalKTables as
>> a
>>>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
>>>> we're joining model  data to about 7 - 10 TB of transactional data a
>> day,
>>>> and on average, run about 5 - 10 instances of our enrichment app with
>> about
>>>> 2GB max heap.
>>>>
>>>> Kind regards,
>>>>
>>>> Liam "Not a part of the Confluent team, but happy to help"
>>>> Clarke-Hutchinson
>>>>
>>>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello confluent team,
>>>>>
>>>>> Could you provide some information on what data structures are used
>>>>> internally by GlobalKTable and KTables. The application that I am
>> working
>>>>> on has a requirement to read cached data from GlobalKTable on every
>>>>> incoming event, so the reads from GlobalKTable need to be efficient.
>>>>>
>>>>
>>>
>>
>>
> 


Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

For GlobalKTable, I am looking at the APIs provided by StreamsBuilder and I
don't see any option to mention in-memory store there: all these API
documentation states that  The resulting GlobalKTable
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html>
will
be materialized in a local KeyValueStore
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
with
an internal store name . It doesn't give an option whether in-memory or
backed by DB

globalTable
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String->
(String
<https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
 topic)
globalTable
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized->
(String
<https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
 topic, Consumed
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Consumed.html>
<K,V> consumed, Materialized
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/Materialized.html>
<K,V,KeyValueStore
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax <mj...@apache.org> wrote:

> By default, RocksDB is used. You can also change it to use an in-memory
> store that is basically a HashMap.
>
>
> -Matthias
>
> On 5/12/20 10:16 AM, Pushkar Deole wrote:
> > Thanks Liam!
> >
> > On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> >> Hi Pushkar,
> >>
> >> GlobalKTables and KTables can have whatever data structure you like, if
> you
> >> provide the appropriate deserializers - for example, an Kafka Streams
> app I
> >> maintain stores model data (exported to a topic per entity from Postgres
> >> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
> ObjectNode's
> >> keyed by entity id
> >>
> >> If you're worried about efficiency, just treat KTables/GlobalKTables as
> a
> >> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
> >> we're joining model  data to about 7 - 10 TB of transactional data a
> day,
> >> and on average, run about 5 - 10 instances of our enrichment app with
> about
> >> 2GB max heap.
> >>
> >> Kind regards,
> >>
> >> Liam "Not a part of the Confluent team, but happy to help"
> >> Clarke-Hutchinson
> >>
> >> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
> >> wrote:
> >>
> >>> Hello confluent team,
> >>>
> >>> Could you provide some information on what data structures are used
> >>> internally by GlobalKTable and KTables. The application that I am
> working
> >>> on has a requirement to read cached data from GlobalKTable on every
> >>> incoming event, so the reads from GlobalKTable need to be efficient.
> >>>
> >>
> >
>
>

Re: data structures used by GlobalKTable, KTable

Posted by "Matthias J. Sax" <mj...@apache.org>.
By default, RocksDB is used. You can also change it to use an in-memory
store that is basically a HashMap.


-Matthias

On 5/12/20 10:16 AM, Pushkar Deole wrote:
> Thanks Liam!
> 
> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
> 
>> Hi Pushkar,
>>
>> GlobalKTables and KTables can have whatever data structure you like, if you
>> provide the appropriate deserializers - for example, an Kafka Streams app I
>> maintain stores model data (exported to a topic per entity from Postgres
>> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson ObjectNode's
>> keyed by entity id
>>
>> If you're worried about efficiency, just treat KTables/GlobalKTables as a
>> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
>> we're joining model  data to about 7 - 10 TB of transactional data a day,
>> and on average, run about 5 - 10 instances of our enrichment app with about
>> 2GB max heap.
>>
>> Kind regards,
>>
>> Liam "Not a part of the Confluent team, but happy to help"
>> Clarke-Hutchinson
>>
>> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
>> wrote:
>>
>>> Hello confluent team,
>>>
>>> Could you provide some information on what data structures are used
>>> internally by GlobalKTable and KTables. The application that I am working
>>> on has a requirement to read cached data from GlobalKTable on every
>>> incoming event, so the reads from GlobalKTable need to be efficient.
>>>
>>
> 


Re: data structures used by GlobalKTable, KTable

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks Liam!

On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hi Pushkar,
>
> GlobalKTables and KTables can have whatever data structure you like, if you
> provide the appropriate deserializers - for example, an Kafka Streams app I
> maintain stores model data (exported to a topic per entity from Postgres
> via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson ObjectNode's
> keyed by entity id
>
> If you're worried about efficiency, just treat KTables/GlobalKTables as a
> HashMap<K, V> to and you're pretty much there. In terms of efficiency,
> we're joining model  data to about 7 - 10 TB of transactional data a day,
> and on average, run about 5 - 10 instances of our enrichment app with about
> 2GB max heap.
>
> Kind regards,
>
> Liam "Not a part of the Confluent team, but happy to help"
> Clarke-Hutchinson
>
> On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
> > Hello confluent team,
> >
> > Could you provide some information on what data structures are used
> > internally by GlobalKTable and KTables. The application that I am working
> > on has a requirement to read cached data from GlobalKTable on every
> > incoming event, so the reads from GlobalKTable need to be efficient.
> >
>

Re: data structures used by GlobalKTable, KTable

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Pushkar,

GlobalKTables and KTables can have whatever data structure you like, if you
provide the appropriate deserializers - for example, an Kafka Streams app I
maintain stores model data (exported to a topic per entity from Postgres
via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson ObjectNode's
keyed by entity id

If you're worried about efficiency, just treat KTables/GlobalKTables as a
HashMap<K, V> to and you're pretty much there. In terms of efficiency,
we're joining model  data to about 7 - 10 TB of transactional data a day,
and on average, run about 5 - 10 instances of our enrichment app with about
2GB max heap.

Kind regards,

Liam "Not a part of the Confluent team, but happy to help" Clarke-Hutchinson

On Tue, May 12, 2020 at 9:35 PM Pushkar Deole <pd...@gmail.com> wrote:

> Hello confluent team,
>
> Could you provide some information on what data structures are used
> internally by GlobalKTable and KTables. The application that I am working
> on has a requirement to read cached data from GlobalKTable on every
> incoming event, so the reads from GlobalKTable need to be efficient.
>