You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Turim <ja...@signalvine.com> on 2018/03/26 13:34:56 UTC

Am I using the state store appropriately?

​We've started experimenting with Kafka to see if it can be used to
aggregate our application data. I think our use case is a match for Kafka
streams, but we aren't sure if we are using the tool correctly.  The proof
of concept we've built seems to be working as designed, I'm not sure that
we are using the APIs appropriately.

Our proof of concept is to use kafka streams to keep a running tally of
information about a program, and writes that data to an output topic, e.g.

    {
      "numberActive": 0,
      "numberInactive": 0,
      "lastLogin": "01-01-1970T00:00:00Z"
    }

Computing the tally is easy, it is essentially executing a compare and swap
(CAS) operation based on the input topic & output field.

The local state store is used to store of the most recent program for a
given key. We join an input stream against the state store and run the CAS
operation using a TransformSupplier, which explictly writes the data to the
state store using

    context.put(...)
    context.commit();

Is this an appropriate use of the local state store?  Is there another
another approach to keeping a stateful running tally in a topic?

thanks,
Jason

Re: Am I using the state store appropriately?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jason,

Your design sounds right to me (I presume you are using PAPI not the
Streams DSL), that you are reading in one stream, calling transform() on
the stream in which an state store is associated with the operator. Since
your update logic seems to be only key-dependent and hence can be
embarrassingly parallelizable via Streams library based on key partitioning.

One thing to note that, it seems you are calling "context.commit()" after
every single put call, which is not a recommended pattern. This is because
`commit()` operation is a pretty heavy call that will involves flushing the
state store, sending commit offset request to the Kafka broker etc, calling
it on every single call would result in very low throughput. It is
recommended to only call commit() only after a bunch of records are
processed, or you can just rely on the Streams config "commit.interval.ms"
to rely on Streams library to only call commit() internally after every
time interval. Note that this will not affect your processing semantics
upon graceful shutting down, since upon shutdown Streams will always
enforce a commit() call.


Guozhang


On Mon, Mar 26, 2018 at 6:34 AM, Jason Turim <ja...@signalvine.com> wrote:

> ​We've started experimenting with Kafka to see if it can be used to
> aggregate our application data. I think our use case is a match for Kafka
> streams, but we aren't sure if we are using the tool correctly.  The proof
> of concept we've built seems to be working as designed, I'm not sure that
> we are using the APIs appropriately.
>
> Our proof of concept is to use kafka streams to keep a running tally of
> information about a program, and writes that data to an output topic, e.g.
>
>     {
>       "numberActive": 0,
>       "numberInactive": 0,
>       "lastLogin": "01-01-1970T00:00:00Z"
>     }
>
> Computing the tally is easy, it is essentially executing a compare and swap
> (CAS) operation based on the input topic & output field.
>
> The local state store is used to store of the most recent program for a
> given key. We join an input stream against the state store and run the CAS
> operation using a TransformSupplier, which explictly writes the data to the
> state store using
>
>     context.put(...)
>     context.commit();
>
> Is this an appropriate use of the local state store?  Is there another
> another approach to keeping a stateful running tally in a topic?
>
> thanks,
> Jason
>



-- 
-- Guozhang