You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Luigi Cerone <lu...@gmail.com> on 2021/11/02 17:29:02 UTC

Kafka streams event deduplication keeping last event in window

I'm using Kafka Streams in a deduplication events problem over short time
windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html)
operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)),
this operator is not viable for my use case.

Then, I've followed this [official Confluent example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the end
of the window.
I'm asking for suggestions on how to implement this use case with Processor
API.

My idea was to use the Processor API with a custom [Transformer](
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html)
and a [Punctuator](
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
).
The transformer would store in a [WindowStore](
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html)
the distinct keys received without returning any KeyValue. Simultaneously,
I'd schedule a punctuator running with an interval equal to the size of the
window in the WindowStore. This punctuator will iterate over the elements
in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
):
```java
    @Override
    @SuppressWarnings("unchecked")
    public void init(final ProcessorContext context) {
        this.context = context;
        eventIdStore = (WindowStore<E, V>)
context.getStateStore(this.storeName);

        // Schedule punctuator for this transformer.
        context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
            new DeduplicationPunctuator<E, V>(eventIdStore, context,
this.windowSizeMs));
    }

    @Override
    public KeyValue<K, V> transform(final K key, final V value) {
        final E eventId = idExtractor.apply(key, value);
        if (eventId == null) {
            return KeyValue.pair(key, value);
        } else {
            if (!isDuplicate(eventId)) {
                rememberNewEvent(eventId, value, context.timestamp());
            }
            return null;
        }
    }
```

DeduplicationPunctuator:
```java
    public DeduplicationPunctuator(WindowStore<E, V> eventIdStore,
ProcessorContext context,
        long retainPeriodMs) {
        this.eventIdStore = eventIdStore;
        this.context = context;
        this.retainPeriodMs = retainPeriodMs;
    }

    @Override
    public void punctuate(long invocationTime) {
        LOGGER.info("Punctuator invoked at {}, searching from {}", new
Date(invocationTime), new Date(invocationTime-retainPeriodMs));

        KeyValueIterator<Windowed<E>, V> it =
            eventIdStore.fetchAll(invocationTime - retainPeriodMs,
invocationTime + retainPeriodMs);

        while (it.hasNext()) {
            KeyValue<Windowed<E>, V> next = it.next();

            LOGGER.info("Punctuator running on {}", next.key.key());

            context.forward(next.key.key(), next.value);
            // Delete from store with tombstone
            eventIdStore.put(next.key.key(), null, invocationTime);
            context.commit();
        }

        it.close();
    }
```

Is this a valid approach?
With the previous code, I'm running some integration tests and I've some
synchronization issues. How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


Also as an alternative approach, I was wondering (I've googled with no
result), if there is any event triggered by window closing to which I can
attach a callback in order to iterate over store and publish only distinct
events.

Thanks.

My question is also here: https://stackoverflow.com/q/69725131/4837677

Re: Kafka streams event deduplication keeping last event in window

Posted by Luigi Cerone <lu...@gmail.com>.
Hello Matthias, thanks for your reply.

> Using a plain kv-store, whenever the punctuation runs you can find closed
windows, forward the result and also delete the row explicitly, which give
you more control.


What is the best way to find closed windows? Have you got any examples?

Thanks! :)

Il giorno mer 3 nov 2021 alle ore 00:34 Matthias J. Sax <mj...@apache.org>
ha scritto:

> I did not study your code snippet, but yes, it sounds like a valid
> approach from your description.
>
> > How can I be sure that the start of the window will
> > coincide with the Punctuator's scheduled interval?
>
> For punctuations, there is always some jitter, because it's not possible
> to run a punctuation at the very exact point in time when it is
> scheduled to run. Thus, a punctuation might fire a little delayed
> anyway. You can also not control the "anchor point" directly, because it
> depends on the point in time when you register the punctuation.
>
> Also note, that a WindowedStore is basically still a key-value store, ie
> a single key-value pair models one window. The main difference is the
> timestamp that is use to expired rows eventually, what just implies that
> expired rows are dropped (without any notification).
>
> Thus, the only thing you can do is, to run the punctuation frequently
> enough to keep latency low enough to detect windows that are logically
> closed to forward the corresponding result.
>
> But you cannot really "bind" the punctuation with the state store
> expiration, and window-store also does not support deletes... Thus, I am
> wondering if using a plain key-value store might be more useful for your
> case? Using a plain kv-store, whenever the punctuation runs you can find
> closed windows, forward the result and also delete the row explicitly,
> which give you more control.
>
> Hope this helps.
>
> -Matthias
>
> On 11/2/21 10:29 AM, Luigi Cerone wrote:
> > I'm using Kafka Streams in a deduplication events problem over short time
> > windows (<= 1 minute).
> > First I've tried to tackle the problem by using DSL API with
> > [`.suppress(Suppressed.untilWindowCloses(...))`](
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
> )
> > operator but, given the fact that wall-clock time is not yet supported
> > (I've seen the [KIP 424](
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)
> ),
> > this operator is not viable for my use case.
> >
> > Then, I've followed this [official Confluent example](
> >
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html
> )
> > in which low level Processor API is used and it was working fine but has
> > one major limitation for my use-case. The single event (obtained by
> > deduplication) is emitted at the **beginning** of the time window,
> > subsequent duplicated events are "suppressed". In my use case I need the
> > reverse of that, meaning that a single event should be emitted at the end
> > of the window.
> > I'm asking for suggestions on how to implement this use case with
> Processor
> > API.
> >
> > My idea was to use the Processor API with a custom [Transformer](
> >
> https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
> )
> > and a [Punctuator](
> >
> https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
> > ).
> > The transformer would store in a [WindowStore](
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html
> )
> > the distinct keys received without returning any KeyValue.
> Simultaneously,
> > I'd schedule a punctuator running with an interval equal to the size of
> the
> > window in the WindowStore. This punctuator will iterate over the elements
> > in the store and forward them downstream.
> > The following are some core parts of the logic:
> >
> > DeduplicationTransformer (slightly modified from [official Confluent
> > example](
> >
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html
> )
> > ):
> > ```java
> >      @Override
> >      @SuppressWarnings("unchecked")
> >      public void init(final ProcessorContext context) {
> >          this.context = context;
> >          eventIdStore = (WindowStore<E, V>)
> > context.getStateStore(this.storeName);
> >
> >          // Schedule punctuator for this transformer.
> >          context.schedule(Duration.ofMillis(this.windowSizeMs),
> > PunctuationType.WALL_CLOCK_TIME,
> >              new DeduplicationPunctuator<E, V>(eventIdStore, context,
> > this.windowSizeMs));
> >      }
> >
> >      @Override
> >      public KeyValue<K, V> transform(final K key, final V value) {
> >          final E eventId = idExtractor.apply(key, value);
> >          if (eventId == null) {
> >              return KeyValue.pair(key, value);
> >          } else {
> >              if (!isDuplicate(eventId)) {
> >                  rememberNewEvent(eventId, value, context.timestamp());
> >              }
> >              return null;
> >          }
> >      }
> > ```
> >
> > DeduplicationPunctuator:
> > ```java
> >      public DeduplicationPunctuator(WindowStore<E, V> eventIdStore,
> > ProcessorContext context,
> >          long retainPeriodMs) {
> >          this.eventIdStore = eventIdStore;
> >          this.context = context;
> >          this.retainPeriodMs = retainPeriodMs;
> >      }
> >
> >      @Override
> >      public void punctuate(long invocationTime) {
> >          LOGGER.info("Punctuator invoked at {}, searching from {}", new
> > Date(invocationTime), new Date(invocationTime-retainPeriodMs));
> >
> >          KeyValueIterator<Windowed<E>, V> it =
> >              eventIdStore.fetchAll(invocationTime - retainPeriodMs,
> > invocationTime + retainPeriodMs);
> >
> >          while (it.hasNext()) {
> >              KeyValue<Windowed<E>, V> next = it.next();
> >
> >              LOGGER.info("Punctuator running on {}", next.key.key());
> >
> >              context.forward(next.key.key(), next.value);
> >              // Delete from store with tombstone
> >              eventIdStore.put(next.key.key(), null, invocationTime);
> >              context.commit();
> >          }
> >
> >          it.close();
> >      }
> > ```
> >
> > Is this a valid approach?
> > With the previous code, I'm running some integration tests and I've some
> > synchronization issues. How can I be sure that the start of the window
> will
> > coincide with the Punctuator's scheduled interval?
> >
> >
> > Also as an alternative approach, I was wondering (I've googled with no
> > result), if there is any event triggered by window closing to which I can
> > attach a callback in order to iterate over store and publish only
> distinct
> > events.
> >
> > Thanks.
> >
> > My question is also here: https://stackoverflow.com/q/69725131/4837677
> >
>

Re: Kafka streams event deduplication keeping last event in window

Posted by "Matthias J. Sax" <mj...@apache.org>.
I did not study your code snippet, but yes, it sounds like a valid 
approach from your description.

> How can I be sure that the start of the window will
> coincide with the Punctuator's scheduled interval?

For punctuations, there is always some jitter, because it's not possible 
to run a punctuation at the very exact point in time when it is 
scheduled to run. Thus, a punctuation might fire a little delayed 
anyway. You can also not control the "anchor point" directly, because it 
depends on the point in time when you register the punctuation.

Also note, that a WindowedStore is basically still a key-value store, ie 
a single key-value pair models one window. The main difference is the 
timestamp that is use to expired rows eventually, what just implies that 
expired rows are dropped (without any notification).

Thus, the only thing you can do is, to run the punctuation frequently 
enough to keep latency low enough to detect windows that are logically 
closed to forward the corresponding result.

But you cannot really "bind" the punctuation with the state store 
expiration, and window-store also does not support deletes... Thus, I am 
wondering if using a plain key-value store might be more useful for your 
case? Using a plain kv-store, whenever the punctuation runs you can find 
closed windows, forward the result and also delete the row explicitly, 
which give you more control.

Hope this helps.

-Matthias

On 11/2/21 10:29 AM, Luigi Cerone wrote:
> I'm using Kafka Streams in a deduplication events problem over short time
> windows (<= 1 minute).
> First I've tried to tackle the problem by using DSL API with
> [`.suppress(Suppressed.untilWindowCloses(...))`](
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html)
> operator but, given the fact that wall-clock time is not yet supported
> (I've seen the [KIP 424](
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)),
> this operator is not viable for my use case.
> 
> Then, I've followed this [official Confluent example](
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
> in which low level Processor API is used and it was working fine but has
> one major limitation for my use-case. The single event (obtained by
> deduplication) is emitted at the **beginning** of the time window,
> subsequent duplicated events are "suppressed". In my use case I need the
> reverse of that, meaning that a single event should be emitted at the end
> of the window.
> I'm asking for suggestions on how to implement this use case with Processor
> API.
> 
> My idea was to use the Processor API with a custom [Transformer](
> https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html)
> and a [Punctuator](
> https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
> ).
> The transformer would store in a [WindowStore](
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html)
> the distinct keys received without returning any KeyValue. Simultaneously,
> I'd schedule a punctuator running with an interval equal to the size of the
> window in the WindowStore. This punctuator will iterate over the elements
> in the store and forward them downstream.
> The following are some core parts of the logic:
> 
> DeduplicationTransformer (slightly modified from [official Confluent
> example](
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
> ):
> ```java
>      @Override
>      @SuppressWarnings("unchecked")
>      public void init(final ProcessorContext context) {
>          this.context = context;
>          eventIdStore = (WindowStore<E, V>)
> context.getStateStore(this.storeName);
> 
>          // Schedule punctuator for this transformer.
>          context.schedule(Duration.ofMillis(this.windowSizeMs),
> PunctuationType.WALL_CLOCK_TIME,
>              new DeduplicationPunctuator<E, V>(eventIdStore, context,
> this.windowSizeMs));
>      }
> 
>      @Override
>      public KeyValue<K, V> transform(final K key, final V value) {
>          final E eventId = idExtractor.apply(key, value);
>          if (eventId == null) {
>              return KeyValue.pair(key, value);
>          } else {
>              if (!isDuplicate(eventId)) {
>                  rememberNewEvent(eventId, value, context.timestamp());
>              }
>              return null;
>          }
>      }
> ```
> 
> DeduplicationPunctuator:
> ```java
>      public DeduplicationPunctuator(WindowStore<E, V> eventIdStore,
> ProcessorContext context,
>          long retainPeriodMs) {
>          this.eventIdStore = eventIdStore;
>          this.context = context;
>          this.retainPeriodMs = retainPeriodMs;
>      }
> 
>      @Override
>      public void punctuate(long invocationTime) {
>          LOGGER.info("Punctuator invoked at {}, searching from {}", new
> Date(invocationTime), new Date(invocationTime-retainPeriodMs));
> 
>          KeyValueIterator<Windowed<E>, V> it =
>              eventIdStore.fetchAll(invocationTime - retainPeriodMs,
> invocationTime + retainPeriodMs);
> 
>          while (it.hasNext()) {
>              KeyValue<Windowed<E>, V> next = it.next();
> 
>              LOGGER.info("Punctuator running on {}", next.key.key());
> 
>              context.forward(next.key.key(), next.value);
>              // Delete from store with tombstone
>              eventIdStore.put(next.key.key(), null, invocationTime);
>              context.commit();
>          }
> 
>          it.close();
>      }
> ```
> 
> Is this a valid approach?
> With the previous code, I'm running some integration tests and I've some
> synchronization issues. How can I be sure that the start of the window will
> coincide with the Punctuator's scheduled interval?
> 
> 
> Also as an alternative approach, I was wondering (I've googled with no
> result), if there is any event triggered by window closing to which I can
> attach a callback in order to iterate over store and publish only distinct
> events.
> 
> Thanks.
> 
> My question is also here: https://stackoverflow.com/q/69725131/4837677
>