You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jonathan Roy <jo...@caldera.com.INVALID> on 2018/07/11 08:41:58 UTC

Questions about state stores and KSQL

Hi Kafka users,

I am very new to Kafka and more globally to stream processing, and am trying to understand some of the concepts used by Kafka. From what I understand, a key-value state store is created on each processor node that performs stateful operations such as aggregations or joins. Let’s take an example. I have an ‘orders’ stream and a ‘users’ table, and I want to enrich the orders events with the corresponding users information, using the KSQL CLI:

CREATE STREAM orders_enriched AS SELECT o.id <http://o.id/>, o.article, o.quantity, o.userId, u.name, u.address, u.email FROM orders o LEFT JOIN users u ON o.userId = u.id <http://u.id/>;

Where is located the state store in this case? What will it contain exactly? Is it possible to query it from another node?

Thanks beforehand for your help!

Jonathan 

Re: Questions about state stores and KSQL

Posted by "Matthias J. Sax" <ma...@confluent.io>.
To understand joins better, you might want to check out:
https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

KSQL uses the same join semantics as Kafka Streams.


-Matthias


On 7/11/18 8:01 AM, Guozhang Wang wrote:
> Hello Jonathan,
> 
> At the very high-level, KSQL statements is compiled into a Kafka Streams
> topology for execution. And the concept "state stores" are for Kafka
> Streams, not for KSQL, where inside the topology for those processor nodes
> that need stateful processing, like Joins, one or more state stores would
> be associated with the nodes.
> 
> Back to your example, this KSQL statement will be compiled into a Kafka
> Streams that roughly looks like this:
> 
> ----------
> 
> Kafka topic that defines stream "topics" --> source node --> join node
> (queries the "users-state" store, as generated below) --> sink node --> Kafka
> topic that defines stream "orders_enriched"
> 
> Kafka topic that defines table "users" --> source node --> materialization
> node (associated with a state store, let's name it "users-state")
> 
> ----------
> 
> That is, one state store will be used to materialize the table changelog
> stream for "users", where the other stream's record will query on.
> 
> In Kafka Streams, you can query a state store following the interactive
> query mechanism:
> 
> https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
> 
> It is not supported in KSQL yet.
> 
> 
> 
> Guozhang
> 
> 
> 
> 
> On Wed, Jul 11, 2018 at 1:41 AM, Jonathan Roy <
> jonathan.roy@caldera.com.invalid> wrote:
> 
>> Hi Kafka users,
>>
>> I am very new to Kafka and more globally to stream processing, and am
>> trying to understand some of the concepts used by Kafka. From what I
>> understand, a key-value state store is created on each processor node that
>> performs stateful operations such as aggregations or joins. Let’s take an
>> example. I have an ‘orders’ stream and a ‘users’ table, and I want to
>> enrich the orders events with the corresponding users information, using
>> the KSQL CLI:
>>
>> CREATE STREAM orders_enriched AS SELECT o.id <http://o.id/>, o.article,
>> o.quantity, o.userId, u.name, u.address, u.email FROM orders o LEFT JOIN
>> users u ON o.userId = u.id <http://u.id/>;
>>
>> Where is located the state store in this case? What will it contain
>> exactly? Is it possible to query it from another node?
>>
>> Thanks beforehand for your help!
>>
>> Jonathan
> 
> 
> 
> 


Re: Questions about state stores and KSQL

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jonathan,

At the very high-level, KSQL statements is compiled into a Kafka Streams
topology for execution. And the concept "state stores" are for Kafka
Streams, not for KSQL, where inside the topology for those processor nodes
that need stateful processing, like Joins, one or more state stores would
be associated with the nodes.

Back to your example, this KSQL statement will be compiled into a Kafka
Streams that roughly looks like this:

----------

Kafka topic that defines stream "topics" --> source node --> join node
(queries the "users-state" store, as generated below) --> sink node --> Kafka
topic that defines stream "orders_enriched"

Kafka topic that defines table "users" --> source node --> materialization
node (associated with a state store, let's name it "users-state")

----------

That is, one state store will be used to materialize the table changelog
stream for "users", where the other stream's record will query on.

In Kafka Streams, you can query a state store following the interactive
query mechanism:

https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html

It is not supported in KSQL yet.



Guozhang




On Wed, Jul 11, 2018 at 1:41 AM, Jonathan Roy <
jonathan.roy@caldera.com.invalid> wrote:

> Hi Kafka users,
>
> I am very new to Kafka and more globally to stream processing, and am
> trying to understand some of the concepts used by Kafka. From what I
> understand, a key-value state store is created on each processor node that
> performs stateful operations such as aggregations or joins. Let’s take an
> example. I have an ‘orders’ stream and a ‘users’ table, and I want to
> enrich the orders events with the corresponding users information, using
> the KSQL CLI:
>
> CREATE STREAM orders_enriched AS SELECT o.id <http://o.id/>, o.article,
> o.quantity, o.userId, u.name, u.address, u.email FROM orders o LEFT JOIN
> users u ON o.userId = u.id <http://u.id/>;
>
> Where is located the state store in this case? What will it contain
> exactly? Is it possible to query it from another node?
>
> Thanks beforehand for your help!
>
> Jonathan




-- 
-- Guozhang