You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "An, Hongguo (CORP)" <Ho...@ADP.com.INVALID> on 2023/06/28 15:49:25 UTC

Kafka Streaming: RocksDbSessionBytesStoreSupplier seems lost data in Kubernetes

Hi:
I am using RocksDbSessionBytesStoreSupplier in my kafka streaming application for an aggregation like this:


var materialized =

                   Materialized.<String, List<CDCRecord>>as(

                             new RocksDbSessionBytesStoreSupplier(env.getProperty("messages.cdc.pft.topic", "NASHCM.PAYROLL.PFT.FILENUMBER"),

                                       Duration.parse(env.getProperty("pft.duration", "P7D")).toMillis()))

                   .withKeySerde(stringSerde)

                   .withValueSerde(listSerde);




stream.windowedBy(SessionWindows

                   .with(Duration.parse(env.getProperty("pft.gap", "PT0.1S")))

                   .grace(Duration.parse(env.getProperty("pft.duration", "PT0.05S")))

          )

          .aggregate(ArrayList::new,

                   (k, v, list)->{list.add(v); return list;},

                   (k, list1, list2)->{list1.addAll(list2); return list1;},

                   materialized)

.toStream().foreach((key, value) -> {

//sometimes value is null, but this should never happened – and we do see some messages not processed.

}



The application runs on Kubernetes, should we not use RocksDbSessionBytesStoreSupplier?



Thanks

Andrew



This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.

Re: Kafka Streaming: RocksDbSessionBytesStoreSupplier seems lost data in Kubernetes

Posted by "Matthias J. Sax" <mj...@apache.org>.
The class `RocksDbSessionBytesStoreSupplier` is in package `internal` 
and thus, you should not use it directly. Instead, you should use the 
public factory class `org.apache.kafka.streams.state.Stores`

However, your usage seems correct in general.

Not sure why you pass-in the supplier directly though? In the end, if 
you want to set a name for the store, you can use 
`Materialized.as("..."), and you can set retention time via 
`Materailazed#withRetention(...)` (what would be the proper usage of the 
API).

Besides this, the store should be backed by a changelog topic and thus 
you should never lose any data, independent of you deployment.

Of course, I would recommend to use a stateful set and re-attach storage 
to the pod to avoid re-creating the store from the changelog.

HTH,

-Matthias


On 6/28/23 8:49 AM, An, Hongguo (CORP) wrote:
> Hi:
> I am using RocksDbSessionBytesStoreSupplier in my kafka streaming application for an aggregation like this:
> 
> 
> var materialized =
> 
>                     Materialized.<String, List<CDCRecord>>as(
> 
>                               new RocksDbSessionBytesStoreSupplier(env.getProperty("messages.cdc.pft.topic", "NASHCM.PAYROLL.PFT.FILENUMBER"),
> 
>                                         Duration.parse(env.getProperty("pft.duration", "P7D")).toMillis()))
> 
>                     .withKeySerde(stringSerde)
> 
>                     .withValueSerde(listSerde);
> 
> 
> 
> 
> stream.windowedBy(SessionWindows
> 
>                     .with(Duration.parse(env.getProperty("pft.gap", "PT0.1S")))
> 
>                     .grace(Duration.parse(env.getProperty("pft.duration", "PT0.05S")))
> 
>            )
> 
>            .aggregate(ArrayList::new,
> 
>                     (k, v, list)->{list.add(v); return list;},
> 
>                     (k, list1, list2)->{list1.addAll(list2); return list1;},
> 
>                     materialized)
> 
> .toStream().foreach((key, value) -> {
> 
> //sometimes value is null, but this should never happened – and we do see some messages not processed.
> 
> }
> 
> 
> 
> The application runs on Kubernetes, should we not use RocksDbSessionBytesStoreSupplier?
> 
> 
> 
> Thanks
> 
> Andrew
> 
> 
> 
> This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.