You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by José Antonio Iñigo <jo...@gmail.com> on 2017/07/21 07:25:52 UTC

Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Hi everybody,

I have been struggling with this problem for quite a while now, resorting
to stackoverflow
<https://stackoverflow.com/questions/45144429/event-sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
for some help with no success. I am hoping to that here I'll get a more
authoritative answer from experienced Kafka users.

This is the summary of my problem:

- I am developing an application based on Spring Boot Microservices for a
shopping domain.
- I want to use Event Sourcing, having Kafka to register the events and
Kafka Streams API stores to materialize the views.
- To simplify the scenario we'll consider only two domains: Orders and
Products.
- The conflicting part:
   1) OrderService publishes an OrderPlaced event indicating a productId
and the quantity.
   2) ProductService consumes the event and queries (with an interactive
query) its Kafka Streams Store (ProductsStore) to check the availability of
the product. If there is availabilty it publishes a ProductReserved event
with productId and quantity:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic

orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
500);
    }else{
        //XXX CANCEL ORDER
    }
}

   Then ProductService consumes its own event in a Kafka Streams processor
to update the stock of the product in the ProductsStore.

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");
stream.filter(...).groupByKey().reduce((...) -> {...}, "ProductsStock");

   3.1) Suppose that in 1) two orders were placed simultaneously for the
same product and there is only stock for one of them
   3.2) ProductService would process the first one, the stock is ok and
would publish the ProductReserved event.
   3.3) We can't assure that ProductService will always process in the
Kafka Streams processor the order1 ProductReserved event to update
ProductsStore before the order2 OrderCreated is processed. Then in cases
ProductService will generate a ProductReserved for order2 incorrectly,
generating an inconsistency.

IMPORTANT: You can find the detailed description, with code and the events
that are published and consumed in the previously referenced stackoverflow
question.

After so much thinking and looking up online I haven't found a single place
where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
Streams solving the problem of atomicity.

I'd really appreciate if someone could propose a solution for this.

Regards
Jose

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Jay Kreps <ja...@confluent.io>.
Hey Chris,

I heard a similar complaint from a few people. I am quite ignorant about
event sourcing and don't feel I understand the relationship fully but I am
interested in understanding a little better what you are saying.

I think we see the world this way:

   1. You store the log of primary events in Kafka
   2. You can create queryable materialized views/indexes derived off of
   these events in Kafka Streams, which I believe would include what in event
   sourcing is called aggregates.

If you change the logic by which aggregates are computed off the raw events
you would rerun the streams app that derived it to recompute the derived
state from the event log. Since this is Kafka this can be done in a
blue/green fashion where you keep the old version of the app running and
start a new version in parallel which recomputes the state from scratch,
then cut over to the newly derived version of your app. In other words the
state is part of the app (which may be split over many instances) not part
of some remote db shared by many versions of the app.

The two things that I have heard are missing:

   1. Your query indexes in the streams app are only eventually consistent
   with writes to the Kafka topic. There is no read-after-write consistency.
   2. You can't compute individual aggregates on their own. That is if you
   have an Order aggregate you can recompute the set of orders from scratch
   but you can't recompute just Order 12453.
   3. The query functionality in streams is quite limited.

Of these two I think (1) and (3) are real limitations for many use cases.

I am actually not sure if (2) is a problem. In general, I do think that if
you change your logic for deriving aggregates from events, the only way to
correctly regenerate your state is to recompute off the event log, right?
Doing this in a one-off way for just some entities may result in derived
state that doesn't match the code and input events you have in odd ways.
Anyhow not sure if that is what you are saying is missing but other people
have said that.

Does that match what you are saying? I actually am too ignorant of this
area and its terminology to fully understand what you mean by the three
examples you give.

-Jay


On Fri, Jul 21, 2017 at 6:51 AM, Chris Richardson <chris@chrisrichardson.net
> wrote:

> Hi,
>
> I like Kafka but I don't understand the claim that it can be used for Event
> Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html>
> and here <https://martinfowler.com/eaaDev/EventSourcing.html>)
>
> One part of the event sourcing is the ability to subscribe to events
> published by aggregates and clearly Kafka works well there.
>
> But the other part of Event Sourcing is "database" like functionality,
> which includes
>
>    - findEventsByPrimaryKey() - needed to be able to reconstruct an
>    aggregate from its events - the essence of event sourcing
>    - Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
>    - business logic - insertNewEvents()) in order to handle this kind of
>    scenario.
>
> The approach we have taken is to implement event sourcing using a database
> and Kafka.
> For instance: see
> https://blog.eventuate.io/2016/10/06/eventuate-local-
> event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/
>
> Chris
>
> --
> Learn microservices - http://learnmicroservices.io
> Microservices application platform http://eventuate.io
>
>
> On Fri, Jul 21, 2017 at 12:25 AM, José Antonio Iñigo <
> joseantonio.inigo@gmail.com> wrote:
>
> > Hi everybody,
> >
> > I have been struggling with this problem for quite a while now, resorting
> > to stackoverflow
> > <https://stackoverflow.com/questions/45144429/event-
> > sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
> > for some help with no success. I am hoping to that here I'll get a more
> > authoritative answer from experienced Kafka users.
> >
> > This is the summary of my problem:
> >
> > - I am developing an application based on Spring Boot Microservices for a
> > shopping domain.
> > - I want to use Event Sourcing, having Kafka to register the events and
> > Kafka Streams API stores to materialize the views.
> > - To simplify the scenario we'll consider only two domains: Orders and
> > Products.
> > - The conflicting part:
> >    1) OrderService publishes an OrderPlaced event indicating a productId
> > and the quantity.
> >    2) ProductService consumes the event and queries (with an interactive
> > query) its Kafka Streams Store (ProductsStore) to check the availability
> of
> > the product. If there is availabilty it publishes a ProductReserved event
> > with productId and quantity:
> >
> > if("OrderPlaced".equals(event.get("eventType"))){
> >
> >     Order order = new Order();
> >     order.setId((String)event.get("orderId"));
> >     order.setProductId((Integer)(event.get("productId")));
> >     order.setUid(event.get("uid").toString());
> >
> >     // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
> >     Integer productStock = getProductStock(order.getProductId());
> >
> >     if(productStock > 0) {
> >         Map<String, Object> event = new HashMap<>();
> >         event.put("name", "ProductReserved");
> >         event.put("orderId", order.getId());
> >         event.put("productId", order.getProductId());
> >
> >         // WRITES A PRODUCT RESERVED EVENT TO orders topic
> >
> > orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
> > 500);
> >     }else{
> >         //XXX CANCEL ORDER
> >     }
> > }
> >
> >    Then ProductService consumes its own event in a Kafka Streams
> processor
> > to update the stock of the product in the ProductsStore.
> >
> > KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> > jsonSerde, "orders");
> > stream.filter(...).groupByKey().reduce((...) -> {...}, "ProductsStock");
> >
> >    3.1) Suppose that in 1) two orders were placed simultaneously for the
> > same product and there is only stock for one of them
> >    3.2) ProductService would process the first one, the stock is ok and
> > would publish the ProductReserved event.
> >    3.3) We can't assure that ProductService will always process in the
> > Kafka Streams processor the order1 ProductReserved event to update
> > ProductsStore before the order2 OrderCreated is processed. Then in cases
> > ProductService will generate a ProductReserved for order2 incorrectly,
> > generating an inconsistency.
> >
> > IMPORTANT: You can find the detailed description, with code and the
> events
> > that are published and consumed in the previously referenced
> stackoverflow
> > question.
> >
> > After so much thinking and looking up online I haven't found a single
> place
> > where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
> > Streams solving the problem of atomicity.
> >
> > I'd really appreciate if someone could propose a solution for this.
> >
> > Regards
> > Jose
> >
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by José Antonio Iñigo <jo...@gmail.com>.
My previous mail was in fact addressed to Ben, not Chris, sorry for the
mistake.

Regards

On Sat, 22 Jul 2017 at 00:15, José Antonio Iñigo <
joseantonio.inigo@gmail.com> wrote:

> Hi Chris,
>
> *"if I understand your problem correctly, the issue is that you need to*
>
>
> *decrement the stock count when you reserve it, rather than splitting it*
> *into a second phase."*
>
> That's exactly the problem, I would need to:
>
> 1) Read the OrderPlaced event from Kafka in ProductService...
> 2) ...query the ProductsStock store to check availability...
> 3) ...update the Product in the same phase (OrderPlacedEvent processing)
> 4) ...publish a ProductReserved message
>
>         // 1) Read the OrderPlaced event...
> @StreamListener(OrderProcessor.INPUT)
> public void handleOrder(Map<String, Object> event){
> logger.info("Event {}", event);
> if("OrderPlaced".equals(event.get("name"))){
> Order order = new Order();
> order.setId((String)event.get("orderId"));
> order.setProductId((Integer)(event.get("productId")));
> order.setUid(event.get("uid").toString());
> ...
>                         // 2) Query the ProductsStockStore...
>                         Integer productStock =
> getProductStock(order.getProductId());
>         if(productStock != null && productStock > 0) {
>                             // 3) Update the ProductsStockStore
>     ???
>
>                             // 4) Publish a new message. No problem here
>
>         }
>
> @Override
> public Integer getProductStock(Integer id) {
> KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> return keyValueStore.get(id);
> }
>
> However the only way I know of updating the store is publishing a new
> event ProductReserved that will be processed by the KStream as a separated
> step (new Kafka message):
>
>     Map<String, Object> event = new HashMap<>();
>     event.put("name", "ProductReserved");
>     event.put("orderId", order.getId());
>     event.put("productId", order.getProductId());
>                             event.put("quantity", -1);
>                             // 3) Update the ProductStore
>
> orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
> 500);
>
> This is the separated KStream config notice // 3) where the update takes
> place:
>
> @Configuration
> public class KStreamsConfig {
>
> @Bean
> public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>
>     Serde<Integer> integerSerde = Serdes.Integer();
>             final Serializer<JsonNode> jsonSerializer = new
> JsonSerializer();
>             final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>             final Serde<JsonNode> jsonSerde =
> Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> jsonSerde, "orders");
>
>             // 3) Update the ProductStore
>             stream.filter( (key, value) -> value != null &&
> value.get("name").asText().equals("ProductReserved"))
> .map( (key, value) -> {
>     return new KeyValue<>(value.get("productId").asInt(),
> value.get("quantity").asInt());
> }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>    return stream;
> }
> }
>
> I've had a look at the StateStoresInTheDSLIntegrationTest.java
> <https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java> but
> I still don't get how to integrate the update step in // 2). No idea how
> I can do all this in the same phase:
>
> - Consume a message
> - Query a KStreams store
> - Update the KStreams store
> - Publish a ProductReserved message.
>
> Could you please outline the necessary code to do it?
>
> Thank you so much.
> Jose
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by José Antonio Iñigo <jo...@gmail.com>.
I didn't say anything... my producer wasn't working properly, messages are
arriving now.

Finally I got it working :-) Thanks so much!!

2017-07-25 12:23 GMT+02:00 José Antonio Iñigo <jo...@gmail.com>:

> Hi Ben,
>
> now I can see what you meant previously about using a Transformer. I was
> following a wrong approach dividing the processing between a Listener and a
> Stream processor.
>
> There's only one thing left that I don't know how to work out, this a
> draft of my code based on yours:
>
> @Bean
> @SuppressWarnings("unchecked")
> public KStream<?, ?> kStream2(KStreamBuilder builder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> final Serde<Integer> integerSerde = Serdes.Integer();
> final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
>         final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
> jsonDeserializer);
> KStream<String, JsonNode> unvalidatedOrdersStream =
> builder.stream(ORDERS_TOPIC);
> KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC);
>
> StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE)
> .withKeys(integerSerde)
> .withValues(jsonSerde)
> .persistent()
> .build();
> builder.addStateStore(productStore);
> ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner =
> (JsonNode value1, JsonNode value2) -> new HashMap<>();
> stockStream.branch(predicates)
> KStream<String, Map<String, String>> orderOutputs =
> unvalidatedOrdersStream.<JsonNode, Map<String,
> String>>outerJoin(stockStream, valueJoiner,  JoinWindows.of(1000));
> orderOutputs.<String, Map<String, String>>transform(() -> new
> StockCountTransformer(), PRODUCTS_STORE)
> .filter((key, value) -> {
> return value != null;
> }).to(ORDERS_TOPIC);
>
> return orderOutputs;
> }
>
> There are two ways of updating the product store:
> - ProductService has a REST endpoint that publishes ProductAdded events to
> product topic
> - OrderService sends a OrderPlaced event to the orders topic.
>
> The problem now is that, if I understand it right, in order to update the
> PRODUCTS_STORE there must be a join of an OrderPlaced event and a
> ProductAdded event *in a certain join window*. If there aren't Order and
> Product events that happen within a time window nothing will be updated in
> the store. What's more, ProductService shoud be able to update its store
> without having anything to do with the orders, shouldn't it? I have tried
> publishing ProductAdded events and nothing happens. Could you give me a
> hint about how to deal with this?
>
> Thanks again for your time!!
>
> 2017-07-24 15:23 GMT+02:00 Ben Stopford <be...@confluent.io>:
>
>> No worries Jose ;-)
>>
>> So there are a few ways you could do this, but I think it’s important that
>> you manage a single “stock level” state store, backed by a changelog. Use
>> this for validation, and keep it up to date at the same time. You should
>> also ensure the input topic(s) are partitioned by productId so any update
>> to, or validation of, the same product will be sequenced. This effectively
>> ensures the mutations of the quantities in stock will be atomic.
>>
>> So say we have two inputs: OrderRequests, StockUpdates
>>
>> Order requests need to validate that there is sufficient stock, via the
>> product store, then decrement the stock value in that store:
>>
>> public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
>> store){
>>
>> Long stockCount = store.get(order.product);
>>
>> if (stockCount - order.quantity >= 0) {
>>
>> //decrement the value in the store
>>
>> store.put(order.product, stockCount - order.amount);
>>
>> return new OrderValidatedEvent(Validation.Passed);
>>
>> } else
>>
>>            return new OrderValidatedEvent(Validation.Failed);
>>
>> }
>>
>> Stock updates need to increase the stock value in the product store as new
>> stock arrives.
>>
>> public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
>> store){
>>
>> Long current = update.get(update.product);
>>
>> store.put(update.product, current + update.amount);
>>
>> }
>>
>> To do the processing we merge input streams, then push this into a
>> transfomer, that uses a single state store to manage the mapping between
>> products and their stock levels.
>>
>> KStream<byte[], String> unvalidatedOrdersStream =
>> builder.stream(orderTopic);
>>
>> KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic);
>>
>> StateStoreSupplier productStore = Stores.create(productStoreName
>> )...build()
>>
>> KStream<byte[], String> orderOutputs =
>>
>> unvalidatedOrdersStream.outerJoin(stockStream, ...)
>>
>> .transform(StockCheckTransformer::new, productStoreName)
>>
>> .filter((key, value) -> value != "");
>>
>> orderOutputs.to(validatedOrdersStream);
>>
>>
>> With the transformer both managing and validating against the stock
>> levels.
>>
>> StockCountTransformer { ….
>>
>> public KeyValue<byte[], Event> transform(ProductId key, Event event)
>>
>> if (event.isStockUpdate()) {
>>
>>                 Stock update = parseStock(value);
>>
>>                 return KeyValue.pair(key,
>>
>> updateStockStore(parseStockUpdate(event), productStore))
>>
>>   } else if (event.isOrderRequest()) {
>>
>>                 return KeyValue.pair(key,
>>
>> validateInventory(parseOrderReq(event), productStore))
>>
>>             }
>>
>> }
>>
>> }
>>
>> Now the stock levels will be held in the changelog topic which backs the
>> ProductStore which we can reuse if we wish.
>>
>> I think we could also optimise this code a bit by splitting into two
>> transformers via streams.branch(..).
>>
>> Regarding EoS. This doesn’t add any magic to your processing logic. It
>> just
>> guarantees that your stock count will be accurate in the face of failure
>> (i.e. you don’t need to manage idempotence yourself).
>>
>> B
>>
>>
>> On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
>> joseantonio.inigo@gmail.com> wrote:
>>
>> > Hi Garret,
>> >
>> > At the moment, to simplify the problem I only have one topic, orders,
>> where
>> > I add products and decrement them based on ProductAdded and
>> ProductReserved
>> > events.
>> >
>> > Yeaterday I was reading about EoS but I don't know if it'll solve the
>> > problem. Dividing the query-update in two steps means that the event
>> > ordering could be:
>> >
>> > OrderPlaced (query stock ok)
>> > OrderPlaced (query stock ok)
>> > ProductReserved (update stock)
>> > ProductReserved (update stock)
>> >
>> > Regarding EoS this sequence is correct, the messages are delivered once
>> in
>> > the order in which they were generated. The problem is the order
>> itself: if
>> > there were a way to query-update-store-generate-event in one step to
>> > produce instead the following sequence of events there wouldn't be any
>> > problem:
>> >
>> > OrderPlaced->ProductReserved (query stock ok + Update stock store +
>> > reserved event)
>> > OrderPlaced->ProductNoStock (query stock fail so no update and
>> out-of-stock
>> > event)
>> >
>> > Regards
>> >
>> > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <ga...@gmail.com>
>> > wrote:
>> >
>> > > Could you take in both topics via the same stream? Meaning don't do a
>> > kafka
>> > > streams join, literally just read both streams. If KStream cant do
>> this,
>> > > dunno haven't tried, then simple upstream merge job to throw them
>> into 1
>> > > topic with same partitioning scheme.
>> > >
>> > > I'd assume you would have the products stream that would be some kind
>> of
>> > > incrementer on state (within the local state store).  The Orders
>> stream
>> > > would act as a decrement to the same stream task.  Exactly once
>> semantics
>> > > and you skirt the issue of having to wait for the update to come back
>> > > around.
>> > >
>> > > Thoughts?
>> > >
>> > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
>> > > joseantonio.inigo@gmail.com> wrote:
>> > >
>> > > > Hi Chris,
>> > > >
>> > > >
>> > > >
>> > > > *"if I understand your problem correctly, the issue is that you need
>> > > > todecrement the stock count when you reserve it, rather than
>> splitting
>> > > it*
>> > > > *into a second phase."*
>> > > >
>> > > > That's exactly the problem, I would need to:
>> > > >
>> > > > 1) Read the OrderPlaced event from Kafka in ProductService...
>> > > > 2) ...query the ProductsStock store to check availability...
>> > > > 3) ...update the Product in the same phase (OrderPlacedEvent
>> > processing)
>> > > > 4) ...publish a ProductReserved message
>> > > >
>> > > >         // 1) Read the OrderPlaced event...
>> > > > @StreamListener(OrderProcessor.INPUT)
>> > > > public void handleOrder(Map<String, Object> event){
>> > > > logger.info("Event {}", event);
>> > > > if("OrderPlaced".equals(event.get("name"))){
>> > > > Order order = new Order();
>> > > > order.setId((String)event.get("orderId"));
>> > > > order.setProductId((Integer)(event.get("productId")));
>> > > > order.setUid(event.get("uid").toString());
>> > > > ...
>> > > >                         // 2) Query the ProductsStockStore...
>> > > >                         Integer productStock =
>> > > > getProductStock(order.getProductId());
>> > > >         if(productStock != null && productStock > 0) {
>> > > >                             // 3) Update the ProductsStockStore
>> > > >     ???
>> > > >
>> > > >                             // 4) Publish a new message. No problem
>> > here
>> > > >
>> > > >         }
>> > > >
>> > > > @Override
>> > > > public Integer getProductStock(Integer id) {
>> > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
>> > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>> > > >    streams.store("ProductsStock", QueryableStoreTypes.keyValueSt
>> ore());
>> > > > return keyValueStore.get(id);
>> > > > }
>> > > >
>> > > > However the only way I know of updating the store is publishing a
>> new
>> > > event
>> > > > ProductReserved that will be processed by the KStream as a separated
>> > step
>> > > > (new Kafka message):
>> > > >
>> > > >     Map<String, Object> event = new HashMap<>();
>> > > >     event.put("name", "ProductReserved");
>> > > >     event.put("orderId", order.getId());
>> > > >     event.put("productId", order.getProductId());
>> > > >                             event.put("quantity", -1);
>> > > >                             // 3) Update the ProductStore
>> > > >     orderProcessor.output().send(MessageBuilder.withPayload(
>> > > > event).build(),
>> > > > 500);
>> > > >
>> > > > This is the separated KStream config notice // 3) where the update
>> > takes
>> > > > place:
>> > > >
>> > > > @Configuration
>> > > > public class KStreamsConfig {
>> > > >
>> > > > @Bean
>> > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
>> > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>> > > >
>> > > >     Serde<Integer> integerSerde = Serdes.Integer();
>> > > >             final Serializer<JsonNode> jsonSerializer = new
>> > > > JsonSerializer();
>> > > >             final Deserializer<JsonNode> jsonDeserializer = new
>> > > > JsonDeserializer();
>> > > >             final Serde<JsonNode> jsonSerde =
>> > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>> > > >    KStream<Integer, JsonNode> stream =
>> > > kStreamBuilder.stream(integerSerde,
>> > > > jsonSerde, "orders");
>> > > >
>> > > >             // 3) Update the ProductStore
>> > > >             stream.filter( (key, value) -> value != null &&
>> > > > value.get("name").asText().equals("ProductReserved"))
>> > > > .map( (key, value) -> {
>> > > >     return new KeyValue<>(value.get("productId").asInt(),
>> > > > value.get("quantity").asInt());
>> > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>> > > >    return stream;
>> > > > }
>> > > > }
>> > > >
>> > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java
>> > > > <https://github.com/confluentinc/examples/blob/
>> > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/
>> > > > StateStoresInTheDSLIntegrationTest.java>
>> > > > but
>> > > > I still don't get how to integrate the update step in // 2). No idea
>> > how
>> > > I
>> > > > can do all this in the same phase:
>> > > >
>> > > > - Consume a message
>> > > > - Query a KStreams store
>> > > > - Update the KStreams store
>> > > > - Publish a ProductReserved message.
>> > > >
>> > > > Could you please outline the necessary code to do it?
>> > > >
>> > > > Thank you so much.
>> > > > Jose
>> > > >
>> > >
>> >
>>
>
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by José Antonio Iñigo <jo...@gmail.com>.
Hi Ben,

now I can see what you meant previously about using a Transformer. I was
following a wrong approach dividing the processing between a Listener and a
Stream processor.

There's only one thing left that I don't know how to work out, this a draft
of my code based on yours:

@Bean
@SuppressWarnings("unchecked")
public KStream<?, ?> kStream2(KStreamBuilder builder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
final Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
KStream<String, JsonNode> unvalidatedOrdersStream =
builder.stream(ORDERS_TOPIC);
KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC);

StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE)
.withKeys(integerSerde)
.withValues(jsonSerde)
.persistent()
.build();
builder.addStateStore(productStore);
ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner =
(JsonNode value1, JsonNode value2) -> new HashMap<>();
stockStream.branch(predicates)
KStream<String, Map<String, String>> orderOutputs =
unvalidatedOrdersStream.<JsonNode, Map<String,
String>>outerJoin(stockStream, valueJoiner,  JoinWindows.of(1000));
orderOutputs.<String, Map<String, String>>transform(() -> new
StockCountTransformer(), PRODUCTS_STORE)
.filter((key, value) -> {
return value != null;
}).to(ORDERS_TOPIC);

return orderOutputs;
}

There are two ways of updating the product store:
- ProductService has a REST endpoint that publishes ProductAdded events to
product topic
- OrderService sends a OrderPlaced event to the orders topic.

The problem now is that, if I understand it right, in order to update the
PRODUCTS_STORE there must be a join of an OrderPlaced event and a
ProductAdded event *in a certain join window*. If there aren't Order and
Product events that happen within a time window nothing will be updated in
the store. What's more, ProductService shoud be able to update its store
without having anything to do with the orders, shouldn't it? I have tried
publishing ProductAdded events and nothing happens. Could you give me a
hint about how to deal with this?

Thanks again for your time!!

2017-07-24 15:23 GMT+02:00 Ben Stopford <be...@confluent.io>:

> No worries Jose ;-)
>
> So there are a few ways you could do this, but I think it’s important that
> you manage a single “stock level” state store, backed by a changelog. Use
> this for validation, and keep it up to date at the same time. You should
> also ensure the input topic(s) are partitioned by productId so any update
> to, or validation of, the same product will be sequenced. This effectively
> ensures the mutations of the quantities in stock will be atomic.
>
> So say we have two inputs: OrderRequests, StockUpdates
>
> Order requests need to validate that there is sufficient stock, via the
> product store, then decrement the stock value in that store:
>
> public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
> store){
>
> Long stockCount = store.get(order.product);
>
> if (stockCount - order.quantity >= 0) {
>
> //decrement the value in the store
>
> store.put(order.product, stockCount - order.amount);
>
> return new OrderValidatedEvent(Validation.Passed);
>
> } else
>
>            return new OrderValidatedEvent(Validation.Failed);
>
> }
>
> Stock updates need to increase the stock value in the product store as new
> stock arrives.
>
> public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
> store){
>
> Long current = update.get(update.product);
>
> store.put(update.product, current + update.amount);
>
> }
>
> To do the processing we merge input streams, then push this into a
> transfomer, that uses a single state store to manage the mapping between
> products and their stock levels.
>
> KStream<byte[], String> unvalidatedOrdersStream =
> builder.stream(orderTopic);
>
> KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic);
>
> StateStoreSupplier productStore = Stores.create(
> productStoreName)...build()
>
> KStream<byte[], String> orderOutputs =
>
> unvalidatedOrdersStream.outerJoin(stockStream, ...)
>
> .transform(StockCheckTransformer::new, productStoreName)
>
> .filter((key, value) -> value != "");
>
> orderOutputs.to(validatedOrdersStream);
>
>
> With the transformer both managing and validating against the stock levels.
>
> StockCountTransformer { ….
>
> public KeyValue<byte[], Event> transform(ProductId key, Event event)
>
> if (event.isStockUpdate()) {
>
>                 Stock update = parseStock(value);
>
>                 return KeyValue.pair(key,
>
> updateStockStore(parseStockUpdate(event), productStore))
>
>   } else if (event.isOrderRequest()) {
>
>                 return KeyValue.pair(key,
>
> validateInventory(parseOrderReq(event), productStore))
>
>             }
>
> }
>
> }
>
> Now the stock levels will be held in the changelog topic which backs the
> ProductStore which we can reuse if we wish.
>
> I think we could also optimise this code a bit by splitting into two
> transformers via streams.branch(..).
>
> Regarding EoS. This doesn’t add any magic to your processing logic. It just
> guarantees that your stock count will be accurate in the face of failure
> (i.e. you don’t need to manage idempotence yourself).
>
> B
>
>
> On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
> joseantonio.inigo@gmail.com> wrote:
>
> > Hi Garret,
> >
> > At the moment, to simplify the problem I only have one topic, orders,
> where
> > I add products and decrement them based on ProductAdded and
> ProductReserved
> > events.
> >
> > Yeaterday I was reading about EoS but I don't know if it'll solve the
> > problem. Dividing the query-update in two steps means that the event
> > ordering could be:
> >
> > OrderPlaced (query stock ok)
> > OrderPlaced (query stock ok)
> > ProductReserved (update stock)
> > ProductReserved (update stock)
> >
> > Regarding EoS this sequence is correct, the messages are delivered once
> in
> > the order in which they were generated. The problem is the order itself:
> if
> > there were a way to query-update-store-generate-event in one step to
> > produce instead the following sequence of events there wouldn't be any
> > problem:
> >
> > OrderPlaced->ProductReserved (query stock ok + Update stock store +
> > reserved event)
> > OrderPlaced->ProductNoStock (query stock fail so no update and
> out-of-stock
> > event)
> >
> > Regards
> >
> > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <ga...@gmail.com>
> > wrote:
> >
> > > Could you take in both topics via the same stream? Meaning don't do a
> > kafka
> > > streams join, literally just read both streams. If KStream cant do
> this,
> > > dunno haven't tried, then simple upstream merge job to throw them into
> 1
> > > topic with same partitioning scheme.
> > >
> > > I'd assume you would have the products stream that would be some kind
> of
> > > incrementer on state (within the local state store).  The Orders stream
> > > would act as a decrement to the same stream task.  Exactly once
> semantics
> > > and you skirt the issue of having to wait for the update to come back
> > > around.
> > >
> > > Thoughts?
> > >
> > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
> > > joseantonio.inigo@gmail.com> wrote:
> > >
> > > > Hi Chris,
> > > >
> > > >
> > > >
> > > > *"if I understand your problem correctly, the issue is that you need
> > > > todecrement the stock count when you reserve it, rather than
> splitting
> > > it*
> > > > *into a second phase."*
> > > >
> > > > That's exactly the problem, I would need to:
> > > >
> > > > 1) Read the OrderPlaced event from Kafka in ProductService...
> > > > 2) ...query the ProductsStock store to check availability...
> > > > 3) ...update the Product in the same phase (OrderPlacedEvent
> > processing)
> > > > 4) ...publish a ProductReserved message
> > > >
> > > >         // 1) Read the OrderPlaced event...
> > > > @StreamListener(OrderProcessor.INPUT)
> > > > public void handleOrder(Map<String, Object> event){
> > > > logger.info("Event {}", event);
> > > > if("OrderPlaced".equals(event.get("name"))){
> > > > Order order = new Order();
> > > > order.setId((String)event.get("orderId"));
> > > > order.setProductId((Integer)(event.get("productId")));
> > > > order.setUid(event.get("uid").toString());
> > > > ...
> > > >                         // 2) Query the ProductsStockStore...
> > > >                         Integer productStock =
> > > > getProductStock(order.getProductId());
> > > >         if(productStock != null && productStock > 0) {
> > > >                             // 3) Update the ProductsStockStore
> > > >     ???
> > > >
> > > >                             // 4) Publish a new message. No problem
> > here
> > > >
> > > >         }
> > > >
> > > > @Override
> > > > public Integer getProductStock(Integer id) {
> > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
> > > >    streams.store("ProductsStock", QueryableStoreTypes.
> keyValueStore());
> > > > return keyValueStore.get(id);
> > > > }
> > > >
> > > > However the only way I know of updating the store is publishing a new
> > > event
> > > > ProductReserved that will be processed by the KStream as a separated
> > step
> > > > (new Kafka message):
> > > >
> > > >     Map<String, Object> event = new HashMap<>();
> > > >     event.put("name", "ProductReserved");
> > > >     event.put("orderId", order.getId());
> > > >     event.put("productId", order.getProductId());
> > > >                             event.put("quantity", -1);
> > > >                             // 3) Update the ProductStore
> > > >     orderProcessor.output().send(MessageBuilder.withPayload(
> > > > event).build(),
> > > > 500);
> > > >
> > > > This is the separated KStream config notice // 3) where the update
> > takes
> > > > place:
> > > >
> > > > @Configuration
> > > > public class KStreamsConfig {
> > > >
> > > > @Bean
> > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> > > >
> > > >     Serde<Integer> integerSerde = Serdes.Integer();
> > > >             final Serializer<JsonNode> jsonSerializer = new
> > > > JsonSerializer();
> > > >             final Deserializer<JsonNode> jsonDeserializer = new
> > > > JsonDeserializer();
> > > >             final Serde<JsonNode> jsonSerde =
> > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
> > > >    KStream<Integer, JsonNode> stream =
> > > kStreamBuilder.stream(integerSerde,
> > > > jsonSerde, "orders");
> > > >
> > > >             // 3) Update the ProductStore
> > > >             stream.filter( (key, value) -> value != null &&
> > > > value.get("name").asText().equals("ProductReserved"))
> > > > .map( (key, value) -> {
> > > >     return new KeyValue<>(value.get("productId").asInt(),
> > > > value.get("quantity").asInt());
> > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
> > > >    return stream;
> > > > }
> > > > }
> > > >
> > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java
> > > > <https://github.com/confluentinc/examples/blob/
> > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > > StateStoresInTheDSLIntegrationTest.java>
> > > > but
> > > > I still don't get how to integrate the update step in // 2). No idea
> > how
> > > I
> > > > can do all this in the same phase:
> > > >
> > > > - Consume a message
> > > > - Query a KStreams store
> > > > - Update the KStreams store
> > > > - Publish a ProductReserved message.
> > > >
> > > > Could you please outline the necessary code to do it?
> > > >
> > > > Thank you so much.
> > > > Jose
> > > >
> > >
> >
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Ben Stopford <be...@confluent.io>.
No worries Jose ;-)

So there are a few ways you could do this, but I think it’s important that
you manage a single “stock level” state store, backed by a changelog. Use
this for validation, and keep it up to date at the same time. You should
also ensure the input topic(s) are partitioned by productId so any update
to, or validation of, the same product will be sequenced. This effectively
ensures the mutations of the quantities in stock will be atomic.

So say we have two inputs: OrderRequests, StockUpdates

Order requests need to validate that there is sufficient stock, via the
product store, then decrement the stock value in that store:

public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
store){

Long stockCount = store.get(order.product);

if (stockCount - order.quantity >= 0) {

//decrement the value in the store

store.put(order.product, stockCount - order.amount);

return new OrderValidatedEvent(Validation.Passed);

} else

           return new OrderValidatedEvent(Validation.Failed);

}

Stock updates need to increase the stock value in the product store as new
stock arrives.

public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
store){

Long current = update.get(update.product);

store.put(update.product, current + update.amount);

}

To do the processing we merge input streams, then push this into a
transfomer, that uses a single state store to manage the mapping between
products and their stock levels.

KStream<byte[], String> unvalidatedOrdersStream =
builder.stream(orderTopic);

KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic);

StateStoreSupplier productStore = Stores.create(productStoreName)...build()

KStream<byte[], String> orderOutputs =

unvalidatedOrdersStream.outerJoin(stockStream, ...)

.transform(StockCheckTransformer::new, productStoreName)

.filter((key, value) -> value != "");

orderOutputs.to(validatedOrdersStream);


With the transformer both managing and validating against the stock levels.

StockCountTransformer { ….

public KeyValue<byte[], Event> transform(ProductId key, Event event)

if (event.isStockUpdate()) {

                Stock update = parseStock(value);

                return KeyValue.pair(key,

updateStockStore(parseStockUpdate(event), productStore))

  } else if (event.isOrderRequest()) {

                return KeyValue.pair(key,

validateInventory(parseOrderReq(event), productStore))

            }

}

}

Now the stock levels will be held in the changelog topic which backs the
ProductStore which we can reuse if we wish.

I think we could also optimise this code a bit by splitting into two
transformers via streams.branch(..).

Regarding EoS. This doesn’t add any magic to your processing logic. It just
guarantees that your stock count will be accurate in the face of failure
(i.e. you don’t need to manage idempotence yourself).

B


On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
joseantonio.inigo@gmail.com> wrote:

> Hi Garret,
>
> At the moment, to simplify the problem I only have one topic, orders, where
> I add products and decrement them based on ProductAdded and ProductReserved
> events.
>
> Yeaterday I was reading about EoS but I don't know if it'll solve the
> problem. Dividing the query-update in two steps means that the event
> ordering could be:
>
> OrderPlaced (query stock ok)
> OrderPlaced (query stock ok)
> ProductReserved (update stock)
> ProductReserved (update stock)
>
> Regarding EoS this sequence is correct, the messages are delivered once in
> the order in which they were generated. The problem is the order itself: if
> there were a way to query-update-store-generate-event in one step to
> produce instead the following sequence of events there wouldn't be any
> problem:
>
> OrderPlaced->ProductReserved (query stock ok + Update stock store +
> reserved event)
> OrderPlaced->ProductNoStock (query stock fail so no update and out-of-stock
> event)
>
> Regards
>
> On Sat, 22 Jul 2017 at 05:35, Garrett Barton <ga...@gmail.com>
> wrote:
>
> > Could you take in both topics via the same stream? Meaning don't do a
> kafka
> > streams join, literally just read both streams. If KStream cant do this,
> > dunno haven't tried, then simple upstream merge job to throw them into 1
> > topic with same partitioning scheme.
> >
> > I'd assume you would have the products stream that would be some kind of
> > incrementer on state (within the local state store).  The Orders stream
> > would act as a decrement to the same stream task.  Exactly once semantics
> > and you skirt the issue of having to wait for the update to come back
> > around.
> >
> > Thoughts?
> >
> > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
> > joseantonio.inigo@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > >
> > >
> > > *"if I understand your problem correctly, the issue is that you need
> > > todecrement the stock count when you reserve it, rather than splitting
> > it*
> > > *into a second phase."*
> > >
> > > That's exactly the problem, I would need to:
> > >
> > > 1) Read the OrderPlaced event from Kafka in ProductService...
> > > 2) ...query the ProductsStock store to check availability...
> > > 3) ...update the Product in the same phase (OrderPlacedEvent
> processing)
> > > 4) ...publish a ProductReserved message
> > >
> > >         // 1) Read the OrderPlaced event...
> > > @StreamListener(OrderProcessor.INPUT)
> > > public void handleOrder(Map<String, Object> event){
> > > logger.info("Event {}", event);
> > > if("OrderPlaced".equals(event.get("name"))){
> > > Order order = new Order();
> > > order.setId((String)event.get("orderId"));
> > > order.setProductId((Integer)(event.get("productId")));
> > > order.setUid(event.get("uid").toString());
> > > ...
> > >                         // 2) Query the ProductsStockStore...
> > >                         Integer productStock =
> > > getProductStock(order.getProductId());
> > >         if(productStock != null && productStock > 0) {
> > >                             // 3) Update the ProductsStockStore
> > >     ???
> > >
> > >                             // 4) Publish a new message. No problem
> here
> > >
> > >         }
> > >
> > > @Override
> > > public Integer getProductStock(Integer id) {
> > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
> > >    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> > > return keyValueStore.get(id);
> > > }
> > >
> > > However the only way I know of updating the store is publishing a new
> > event
> > > ProductReserved that will be processed by the KStream as a separated
> step
> > > (new Kafka message):
> > >
> > >     Map<String, Object> event = new HashMap<>();
> > >     event.put("name", "ProductReserved");
> > >     event.put("orderId", order.getId());
> > >     event.put("productId", order.getProductId());
> > >                             event.put("quantity", -1);
> > >                             // 3) Update the ProductStore
> > >     orderProcessor.output().send(MessageBuilder.withPayload(
> > > event).build(),
> > > 500);
> > >
> > > This is the separated KStream config notice // 3) where the update
> takes
> > > place:
> > >
> > > @Configuration
> > > public class KStreamsConfig {
> > >
> > > @Bean
> > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> > >
> > >     Serde<Integer> integerSerde = Serdes.Integer();
> > >             final Serializer<JsonNode> jsonSerializer = new
> > > JsonSerializer();
> > >             final Deserializer<JsonNode> jsonDeserializer = new
> > > JsonDeserializer();
> > >             final Serde<JsonNode> jsonSerde =
> > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
> > >    KStream<Integer, JsonNode> stream =
> > kStreamBuilder.stream(integerSerde,
> > > jsonSerde, "orders");
> > >
> > >             // 3) Update the ProductStore
> > >             stream.filter( (key, value) -> value != null &&
> > > value.get("name").asText().equals("ProductReserved"))
> > > .map( (key, value) -> {
> > >     return new KeyValue<>(value.get("productId").asInt(),
> > > value.get("quantity").asInt());
> > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
> > >    return stream;
> > > }
> > > }
> > >
> > > I've had a look at the StateStoresInTheDSLIntegrationTest.java
> > > <https://github.com/confluentinc/examples/blob/
> > > master/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > StateStoresInTheDSLIntegrationTest.java>
> > > but
> > > I still don't get how to integrate the update step in // 2). No idea
> how
> > I
> > > can do all this in the same phase:
> > >
> > > - Consume a message
> > > - Query a KStreams store
> > > - Update the KStreams store
> > > - Publish a ProductReserved message.
> > >
> > > Could you please outline the necessary code to do it?
> > >
> > > Thank you so much.
> > > Jose
> > >
> >
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by José Antonio Iñigo <jo...@gmail.com>.
Hi Garret,

At the moment, to simplify the problem I only have one topic, orders, where
I add products and decrement them based on ProductAdded and ProductReserved
events.

Yeaterday I was reading about EoS but I don't know if it'll solve the
problem. Dividing the query-update in two steps means that the event
ordering could be:

OrderPlaced (query stock ok)
OrderPlaced (query stock ok)
ProductReserved (update stock)
ProductReserved (update stock)

Regarding EoS this sequence is correct, the messages are delivered once in
the order in which they were generated. The problem is the order itself: if
there were a way to query-update-store-generate-event in one step to
produce instead the following sequence of events there wouldn't be any
problem:

OrderPlaced->ProductReserved (query stock ok + Update stock store +
reserved event)
OrderPlaced->ProductNoStock (query stock fail so no update and out-of-stock
event)

Regards

On Sat, 22 Jul 2017 at 05:35, Garrett Barton <ga...@gmail.com>
wrote:

> Could you take in both topics via the same stream? Meaning don't do a kafka
> streams join, literally just read both streams. If KStream cant do this,
> dunno haven't tried, then simple upstream merge job to throw them into 1
> topic with same partitioning scheme.
>
> I'd assume you would have the products stream that would be some kind of
> incrementer on state (within the local state store).  The Orders stream
> would act as a decrement to the same stream task.  Exactly once semantics
> and you skirt the issue of having to wait for the update to come back
> around.
>
> Thoughts?
>
> On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
> joseantonio.inigo@gmail.com> wrote:
>
> > Hi Chris,
> >
> >
> >
> > *"if I understand your problem correctly, the issue is that you need
> > todecrement the stock count when you reserve it, rather than splitting
> it*
> > *into a second phase."*
> >
> > That's exactly the problem, I would need to:
> >
> > 1) Read the OrderPlaced event from Kafka in ProductService...
> > 2) ...query the ProductsStock store to check availability...
> > 3) ...update the Product in the same phase (OrderPlacedEvent processing)
> > 4) ...publish a ProductReserved message
> >
> >         // 1) Read the OrderPlaced event...
> > @StreamListener(OrderProcessor.INPUT)
> > public void handleOrder(Map<String, Object> event){
> > logger.info("Event {}", event);
> > if("OrderPlaced".equals(event.get("name"))){
> > Order order = new Order();
> > order.setId((String)event.get("orderId"));
> > order.setProductId((Integer)(event.get("productId")));
> > order.setUid(event.get("uid").toString());
> > ...
> >                         // 2) Query the ProductsStockStore...
> >                         Integer productStock =
> > getProductStock(order.getProductId());
> >         if(productStock != null && productStock > 0) {
> >                             // 3) Update the ProductsStockStore
> >     ???
> >
> >                             // 4) Publish a new message. No problem here
> >
> >         }
> >
> > @Override
> > public Integer getProductStock(Integer id) {
> > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
> >    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> > return keyValueStore.get(id);
> > }
> >
> > However the only way I know of updating the store is publishing a new
> event
> > ProductReserved that will be processed by the KStream as a separated step
> > (new Kafka message):
> >
> >     Map<String, Object> event = new HashMap<>();
> >     event.put("name", "ProductReserved");
> >     event.put("orderId", order.getId());
> >     event.put("productId", order.getProductId());
> >                             event.put("quantity", -1);
> >                             // 3) Update the ProductStore
> >     orderProcessor.output().send(MessageBuilder.withPayload(
> > event).build(),
> > 500);
> >
> > This is the separated KStream config notice // 3) where the update takes
> > place:
> >
> > @Configuration
> > public class KStreamsConfig {
> >
> > @Bean
> > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> >
> >     Serde<Integer> integerSerde = Serdes.Integer();
> >             final Serializer<JsonNode> jsonSerializer = new
> > JsonSerializer();
> >             final Deserializer<JsonNode> jsonDeserializer = new
> > JsonDeserializer();
> >             final Serde<JsonNode> jsonSerde =
> > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
> >    KStream<Integer, JsonNode> stream =
> kStreamBuilder.stream(integerSerde,
> > jsonSerde, "orders");
> >
> >             // 3) Update the ProductStore
> >             stream.filter( (key, value) -> value != null &&
> > value.get("name").asText().equals("ProductReserved"))
> > .map( (key, value) -> {
> >     return new KeyValue<>(value.get("productId").asInt(),
> > value.get("quantity").asInt());
> > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
> >    return stream;
> > }
> > }
> >
> > I've had a look at the StateStoresInTheDSLIntegrationTest.java
> > <https://github.com/confluentinc/examples/blob/
> > master/kafka-streams/src/test/java/io/confluent/examples/streams/
> > StateStoresInTheDSLIntegrationTest.java>
> > but
> > I still don't get how to integrate the update step in // 2). No idea how
> I
> > can do all this in the same phase:
> >
> > - Consume a message
> > - Query a KStreams store
> > - Update the KStreams store
> > - Publish a ProductReserved message.
> >
> > Could you please outline the necessary code to do it?
> >
> > Thank you so much.
> > Jose
> >
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Garrett Barton <ga...@gmail.com>.
Could you take in both topics via the same stream? Meaning don't do a kafka
streams join, literally just read both streams. If KStream cant do this,
dunno haven't tried, then simple upstream merge job to throw them into 1
topic with same partitioning scheme.

I'd assume you would have the products stream that would be some kind of
incrementer on state (within the local state store).  The Orders stream
would act as a decrement to the same stream task.  Exactly once semantics
and you skirt the issue of having to wait for the update to come back
around.

Thoughts?

On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
joseantonio.inigo@gmail.com> wrote:

> Hi Chris,
>
>
>
> *"if I understand your problem correctly, the issue is that you need
> todecrement the stock count when you reserve it, rather than splitting it*
> *into a second phase."*
>
> That's exactly the problem, I would need to:
>
> 1) Read the OrderPlaced event from Kafka in ProductService...
> 2) ...query the ProductsStock store to check availability...
> 3) ...update the Product in the same phase (OrderPlacedEvent processing)
> 4) ...publish a ProductReserved message
>
>         // 1) Read the OrderPlaced event...
> @StreamListener(OrderProcessor.INPUT)
> public void handleOrder(Map<String, Object> event){
> logger.info("Event {}", event);
> if("OrderPlaced".equals(event.get("name"))){
> Order order = new Order();
> order.setId((String)event.get("orderId"));
> order.setProductId((Integer)(event.get("productId")));
> order.setUid(event.get("uid").toString());
> ...
>                         // 2) Query the ProductsStockStore...
>                         Integer productStock =
> getProductStock(order.getProductId());
>         if(productStock != null && productStock > 0) {
>                             // 3) Update the ProductsStockStore
>     ???
>
>                             // 4) Publish a new message. No problem here
>
>         }
>
> @Override
> public Integer getProductStock(Integer id) {
> KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> return keyValueStore.get(id);
> }
>
> However the only way I know of updating the store is publishing a new event
> ProductReserved that will be processed by the KStream as a separated step
> (new Kafka message):
>
>     Map<String, Object> event = new HashMap<>();
>     event.put("name", "ProductReserved");
>     event.put("orderId", order.getId());
>     event.put("productId", order.getProductId());
>                             event.put("quantity", -1);
>                             // 3) Update the ProductStore
>     orderProcessor.output().send(MessageBuilder.withPayload(
> event).build(),
> 500);
>
> This is the separated KStream config notice // 3) where the update takes
> place:
>
> @Configuration
> public class KStreamsConfig {
>
> @Bean
> public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>
>     Serde<Integer> integerSerde = Serdes.Integer();
>             final Serializer<JsonNode> jsonSerializer = new
> JsonSerializer();
>             final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>             final Serde<JsonNode> jsonSerde =
> Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> jsonSerde, "orders");
>
>             // 3) Update the ProductStore
>             stream.filter( (key, value) -> value != null &&
> value.get("name").asText().equals("ProductReserved"))
> .map( (key, value) -> {
>     return new KeyValue<>(value.get("productId").asInt(),
> value.get("quantity").asInt());
> }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>    return stream;
> }
> }
>
> I've had a look at the StateStoresInTheDSLIntegrationTest.java
> <https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/streams/
> StateStoresInTheDSLIntegrationTest.java>
> but
> I still don't get how to integrate the update step in // 2). No idea how I
> can do all this in the same phase:
>
> - Consume a message
> - Query a KStreams store
> - Update the KStreams store
> - Publish a ProductReserved message.
>
> Could you please outline the necessary code to do it?
>
> Thank you so much.
> Jose
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by José Antonio Iñigo <jo...@gmail.com>.
Hi Chris,



*"if I understand your problem correctly, the issue is that you need
todecrement the stock count when you reserve it, rather than splitting it*
*into a second phase."*

That's exactly the problem, I would need to:

1) Read the OrderPlaced event from Kafka in ProductService...
2) ...query the ProductsStock store to check availability...
3) ...update the Product in the same phase (OrderPlacedEvent processing)
4) ...publish a ProductReserved message

        // 1) Read the OrderPlaced event...
@StreamListener(OrderProcessor.INPUT)
public void handleOrder(Map<String, Object> event){
logger.info("Event {}", event);
if("OrderPlaced".equals(event.get("name"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
...
                        // 2) Query the ProductsStockStore...
                        Integer productStock =
getProductStock(order.getProductId());
        if(productStock != null && productStock > 0) {
                            // 3) Update the ProductsStockStore
    ???

                            // 4) Publish a new message. No problem here

        }

@Override
public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
   streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);
}

However the only way I know of updating the store is publishing a new event
ProductReserved that will be processed by the KStream as a separated step
(new Kafka message):

    Map<String, Object> event = new HashMap<>();
    event.put("name", "ProductReserved");
    event.put("orderId", order.getId());
    event.put("productId", order.getProductId());
                            event.put("quantity", -1);
                            // 3) Update the ProductStore
    orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
500);

This is the separated KStream config notice // 3) where the update takes
place:

@Configuration
public class KStreamsConfig {

@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

    Serde<Integer> integerSerde = Serdes.Integer();
            final Serializer<JsonNode> jsonSerializer = new
JsonSerializer();
            final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
            final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
   KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");

            // 3) Update the ProductStore
            stream.filter( (key, value) -> value != null &&
value.get("name").asText().equals("ProductReserved"))
.map( (key, value) -> {
    return new KeyValue<>(value.get("productId").asInt(),
value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
   return stream;
}
}

I've had a look at the StateStoresInTheDSLIntegrationTest.java
<https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
but
I still don't get how to integrate the update step in // 2). No idea how I
can do all this in the same phase:

- Consume a message
- Query a KStreams store
- Update the KStreams store
- Publish a ProductReserved message.

Could you please outline the necessary code to do it?

Thank you so much.
Jose

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Ben Stopford <be...@confluent.io>.
Hi Jose
If I understand your problem correctly, the issue is that you need to
decrement the stock count when you reserve it, rather than splitting it
into a second phase. You can do this via the DSL with a Transfomer. There's
a related example below. Alternatively you could do it with the processor
API. As Michael suggested, you can now do this atomically with the EoS
feature.

Hi Chris
Kafka Streams provides the index that gives you the findByPk semantic.
Although for high performance use cases the Kafka broker is often used,
alone, via the Memory Image approach.

B
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
https://martinfowler.com/bliki/MemoryImage.html


On Fri, Jul 21, 2017 at 3:15 PM Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> With Kafka Streams you get those and atomicity via Exactly-once-Semantics.
>
> Michał
>
>
> On 21/07/17 14:51, Chris Richardson wrote:
> > Hi,
> >
> > I like Kafka but I don't understand the claim that it can be used for
> Event
> > Sourcing (here <
> http://microservices.io/patterns/data/event-sourcing.html>
> > and here <https://martinfowler.com/eaaDev/EventSourcing.html>)
> >
> > One part of the event sourcing is the ability to subscribe to events
> > published by aggregates and clearly Kafka works well there.
> >
> > But the other part of Event Sourcing is "database" like functionality,
> > which includes
> >
> >     - findEventsByPrimaryKey() - needed to be able to reconstruct an
> >     aggregate from its events - the essence of event sourcing
> >     - Atomic updates -  for updating aggregates  -
> findEventsByPrimaryKey()
> >     - business logic - insertNewEvents()) in order to handle this kind of
> >     scenario.
> >
> > The approach we have taken is to implement event sourcing using a
> database
> > and Kafka.
> > For instance: see
> >
> https://blog.eventuate.io/2016/10/06/eventuate-local-event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/
> >
> > Chris
> >
>
> --
> Signature
> Michal Borowiecki       <http://www.openbet.com/>
> *Senior Software Engineer L4*
> *T:*    +44 208 742 1600 <+44%2020%208742%201600> <
> https://signature.openbet/cgi-bin/signature.php#>
>
>         *E:*    michal.borowiecki@openbet.com
> <https://signature.openbet/cgi-bin/signature.php#>
> *DL: *  +44 203 249 8448 <+44%2020%203249%208448>
> <https://signature.openbet/cgi-bin/signature.php#>
>
>         *W:*    www.openbet.com <
> https://signature.openbet/cgi-bin/signature.php#>
> **
>
>
>         **
>
> <https://www.openbet.com/com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com <ma...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
>
>

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Michal Borowiecki <mi...@openbet.com>.
With Kafka Streams you get those and atomicity via Exactly-once-Semantics.

Michał


On 21/07/17 14:51, Chris Richardson wrote:
> Hi,
>
> I like Kafka but I don't understand the claim that it can be used for Event
> Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html>
> and here <https://martinfowler.com/eaaDev/EventSourcing.html>)
>
> One part of the event sourcing is the ability to subscribe to events
> published by aggregates and clearly Kafka works well there.
>
> But the other part of Event Sourcing is "database" like functionality,
> which includes
>
>     - findEventsByPrimaryKey() - needed to be able to reconstruct an
>     aggregate from its events - the essence of event sourcing
>     - Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
>     - business logic - insertNewEvents()) in order to handle this kind of
>     scenario.
>
> The approach we have taken is to implement event sourcing using a database
> and Kafka.
> For instance: see
> https://blog.eventuate.io/2016/10/06/eventuate-local-event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/
>
> Chris
>

-- 
Signature
Michal Borowiecki 	<http://www.openbet.com/>
*Senior Software Engineer L4*
*T:* 	+44 208 742 1600 <https://signature.openbet/cgi-bin/signature.php#> 	
	
	*E:* 	michal.borowiecki@openbet.com 
<https://signature.openbet/cgi-bin/signature.php#>
*DL: * 	+44 203 249 8448 
<https://signature.openbet/cgi-bin/signature.php#> 	
	
	*W:* 	www.openbet.com <https://signature.openbet/cgi-bin/signature.php#>
** 	
	
	
	** 	

<https://www.openbet.com/com/email_promo>
This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Debasish Ghosh <gh...@gmail.com>.
Kafka has quite a few tricks up its sleeve that can help implementing event
sourced systems ..

   - application state - one of the things that u may want to do in an
   event sourced system is manage and query the state of the application. If
   you use Kafka Streams, you get the full functionality of state management
   through local states backed up by persistent Kafka log-compacted topics. So
   you get full statefulness which is resilient as well.
   - for database like functionalities you can use Kafka Connect to store
   events in a real database. And enjoy queries from it.
   - and with 0.11 you get exactly-once processing with transactions as
   well. Though usually in Kafka you don't do updates, you add to the log and
   the state gets updated in the local store and in the backed up topic.

regards.

On Fri, Jul 21, 2017 at 7:21 PM, Chris Richardson <chris@chrisrichardson.net
> wrote:

> Hi,
>
> I like Kafka but I don't understand the claim that it can be used for Event
> Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html>
> and here <https://martinfowler.com/eaaDev/EventSourcing.html>)
>
> One part of the event sourcing is the ability to subscribe to events
> published by aggregates and clearly Kafka works well there.
>
> But the other part of Event Sourcing is "database" like functionality,
> which includes
>
>    - findEventsByPrimaryKey() - needed to be able to reconstruct an
>    aggregate from its events - the essence of event sourcing
>    - Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
>    - business logic - insertNewEvents()) in order to handle this kind of
>    scenario.
>
> The approach we have taken is to implement event sourcing using a database
> and Kafka.
> For instance: see
> https://blog.eventuate.io/2016/10/06/eventuate-local-
> event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/
>
> Chris
>
> --
> Learn microservices - http://learnmicroservices.io
> Microservices application platform http://eventuate.io
>
>
> On Fri, Jul 21, 2017 at 12:25 AM, José Antonio Iñigo <
> joseantonio.inigo@gmail.com> wrote:
>
> > Hi everybody,
> >
> > I have been struggling with this problem for quite a while now, resorting
> > to stackoverflow
> > <https://stackoverflow.com/questions/45144429/event-
> > sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
> > for some help with no success. I am hoping to that here I'll get a more
> > authoritative answer from experienced Kafka users.
> >
> > This is the summary of my problem:
> >
> > - I am developing an application based on Spring Boot Microservices for a
> > shopping domain.
> > - I want to use Event Sourcing, having Kafka to register the events and
> > Kafka Streams API stores to materialize the views.
> > - To simplify the scenario we'll consider only two domains: Orders and
> > Products.
> > - The conflicting part:
> >    1) OrderService publishes an OrderPlaced event indicating a productId
> > and the quantity.
> >    2) ProductService consumes the event and queries (with an interactive
> > query) its Kafka Streams Store (ProductsStore) to check the availability
> of
> > the product. If there is availabilty it publishes a ProductReserved event
> > with productId and quantity:
> >
> > if("OrderPlaced".equals(event.get("eventType"))){
> >
> >     Order order = new Order();
> >     order.setId((String)event.get("orderId"));
> >     order.setProductId((Integer)(event.get("productId")));
> >     order.setUid(event.get("uid").toString());
> >
> >     // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
> >     Integer productStock = getProductStock(order.getProductId());
> >
> >     if(productStock > 0) {
> >         Map<String, Object> event = new HashMap<>();
> >         event.put("name", "ProductReserved");
> >         event.put("orderId", order.getId());
> >         event.put("productId", order.getProductId());
> >
> >         // WRITES A PRODUCT RESERVED EVENT TO orders topic
> >
> > orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
> > 500);
> >     }else{
> >         //XXX CANCEL ORDER
> >     }
> > }
> >
> >    Then ProductService consumes its own event in a Kafka Streams
> processor
> > to update the stock of the product in the ProductsStore.
> >
> > KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> > jsonSerde, "orders");
> > stream.filter(...).groupByKey().reduce((...) -> {...}, "ProductsStock");
> >
> >    3.1) Suppose that in 1) two orders were placed simultaneously for the
> > same product and there is only stock for one of them
> >    3.2) ProductService would process the first one, the stock is ok and
> > would publish the ProductReserved event.
> >    3.3) We can't assure that ProductService will always process in the
> > Kafka Streams processor the order1 ProductReserved event to update
> > ProductsStore before the order2 OrderCreated is processed. Then in cases
> > ProductService will generate a ProductReserved for order2 incorrectly,
> > generating an inconsistency.
> >
> > IMPORTANT: You can find the detailed description, with code and the
> events
> > that are published and consumed in the previously referenced
> stackoverflow
> > question.
> >
> > After so much thinking and looking up online I haven't found a single
> place
> > where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
> > Streams solving the problem of atomicity.
> >
> > I'd really appreciate if someone could propose a solution for this.
> >
> > Regards
> > Jose
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

Posted by Chris Richardson <ch...@chrisrichardson.net>.
Hi,

I like Kafka but I don't understand the claim that it can be used for Event
Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html>
and here <https://martinfowler.com/eaaDev/EventSourcing.html>)

One part of the event sourcing is the ability to subscribe to events
published by aggregates and clearly Kafka works well there.

But the other part of Event Sourcing is "database" like functionality,
which includes

   - findEventsByPrimaryKey() - needed to be able to reconstruct an
   aggregate from its events - the essence of event sourcing
   - Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
   - business logic - insertNewEvents()) in order to handle this kind of
   scenario.

The approach we have taken is to implement event sourcing using a database
and Kafka.
For instance: see
https://blog.eventuate.io/2016/10/06/eventuate-local-event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/

Chris

-- 
Learn microservices - http://learnmicroservices.io
Microservices application platform http://eventuate.io


On Fri, Jul 21, 2017 at 12:25 AM, José Antonio Iñigo <
joseantonio.inigo@gmail.com> wrote:

> Hi everybody,
>
> I have been struggling with this problem for quite a while now, resorting
> to stackoverflow
> <https://stackoverflow.com/questions/45144429/event-
> sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
> for some help with no success. I am hoping to that here I'll get a more
> authoritative answer from experienced Kafka users.
>
> This is the summary of my problem:
>
> - I am developing an application based on Spring Boot Microservices for a
> shopping domain.
> - I want to use Event Sourcing, having Kafka to register the events and
> Kafka Streams API stores to materialize the views.
> - To simplify the scenario we'll consider only two domains: Orders and
> Products.
> - The conflicting part:
>    1) OrderService publishes an OrderPlaced event indicating a productId
> and the quantity.
>    2) ProductService consumes the event and queries (with an interactive
> query) its Kafka Streams Store (ProductsStore) to check the availability of
> the product. If there is availabilty it publishes a ProductReserved event
> with productId and quantity:
>
> if("OrderPlaced".equals(event.get("eventType"))){
>
>     Order order = new Order();
>     order.setId((String)event.get("orderId"));
>     order.setProductId((Integer)(event.get("productId")));
>     order.setUid(event.get("uid").toString());
>
>     // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
>     Integer productStock = getProductStock(order.getProductId());
>
>     if(productStock > 0) {
>         Map<String, Object> event = new HashMap<>();
>         event.put("name", "ProductReserved");
>         event.put("orderId", order.getId());
>         event.put("productId", order.getProductId());
>
>         // WRITES A PRODUCT RESERVED EVENT TO orders topic
>
> orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
> 500);
>     }else{
>         //XXX CANCEL ORDER
>     }
> }
>
>    Then ProductService consumes its own event in a Kafka Streams processor
> to update the stock of the product in the ProductsStore.
>
> KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> jsonSerde, "orders");
> stream.filter(...).groupByKey().reduce((...) -> {...}, "ProductsStock");
>
>    3.1) Suppose that in 1) two orders were placed simultaneously for the
> same product and there is only stock for one of them
>    3.2) ProductService would process the first one, the stock is ok and
> would publish the ProductReserved event.
>    3.3) We can't assure that ProductService will always process in the
> Kafka Streams processor the order1 ProductReserved event to update
> ProductsStore before the order2 OrderCreated is processed. Then in cases
> ProductService will generate a ProductReserved for order2 incorrectly,
> generating an inconsistency.
>
> IMPORTANT: You can find the detailed description, with code and the events
> that are published and consumed in the previously referenced stackoverflow
> question.
>
> After so much thinking and looking up online I haven't found a single place
> where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
> Streams solving the problem of atomicity.
>
> I'd really appreciate if someone could propose a solution for this.
>
> Regards
> Jose
>