You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by József Molnár <mo...@gmail.com> on 2018/06/18 09:18:56 UTC

Process messages from StateStore

Hi!

I have an application which uses an input and output topic, and every
message from the input topic should have a corresponding message (with the
same key) in the output topic.

To detect lost messages (=no output after a certain amount of time,
~10days) I tried to use a KTable - KTable left join and check where the
output values are null in the result KTable's state store.

Sample code:
// Stream setup
StreamsBuilder builder = new StreamsBuilder();
KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
Consumed.with(...).filter(...));
KTable<String, OutboundMsg> outputTable = builder.table("outputTopic",
Consumed.with(...));

Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store =
        Materialized.<String, InboundMsg, KeyValueStore<Bytes,
byte[]>>as("Store")..;
KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable,
ValueMapper, store);

// Read from store
ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore =
streams.store("Store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all();

Is there any other way to read from the state store and possibly stream it
to a topic? As there can be a couple of million messages int he topics,
reading all of them with an iterator will be not performant enough.

Thanks,
Jozsef

Re: Process messages from StateStore

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Jozsef,

Your question is a little unclear to me.

> To detect lost messages 

For what topology?

>> KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
>> Consumed.with(...).filter(...));

The code you show contains a `filter()` that can remove record? Could
this be the issue?

It's also unclear to me, if the original "inputTopic" is record stream
or a changelog stream? Thus, I am not sure if you join idea is the right
approach. It would be important to understand the actual topology that
produced the output topic.

Also note, that some DSL operators (like aggregate() and join()) might
drop some records if they have null-key or null-value. Maybe this is the
reason for missing data in the output?


-Matthias


On 6/18/18 2:18 AM, József Molnár wrote:
> Hi!
> 
> I have an application which uses an input and output topic, and every
> message from the input topic should have a corresponding message (with the
> same key) in the output topic.
> 
> To detect lost messages (=no output after a certain amount of time,
> ~10days) I tried to use a KTable - KTable left join and check where the
> output values are null in the result KTable's state store.
> 
> Sample code:
> // Stream setup
> StreamsBuilder builder = new StreamsBuilder();
> KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
> Consumed.with(...).filter(...));
> KTable<String, OutboundMsg> outputTable = builder.table("outputTopic",
> Consumed.with(...));
> 
> Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store =
>         Materialized.<String, InboundMsg, KeyValueStore<Bytes,
> byte[]>>as("Store")..;
> KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable,
> ValueMapper, store);
> 
> // Read from store
> ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore =
> streams.store("Store", QueryableStoreTypes.keyValueStore());
> KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all();
> 
> Is there any other way to read from the state store and possibly stream it
> to a topic? As there can be a couple of million messages int he topics,
> reading all of them with an iterator will be not performant enough.
> 
> Thanks,
> Jozsef
>