You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ofir Sharony <of...@myheritage.com> on 2017/03/01 12:34:18 UTC

Kafka Streams - ordering grouped messages

Hi,

I have the following code on a stream:

.selectKey(...)
.groupByKey(...)
.reduce(...)

The records arrived to the Reducer function in the same order they were
consumed from Kafka
I have implemented a TimestampExtractor, extracting the wanted timestamp
from each record, but unfortunately this didn't have any effect on the
order the messages were received in the Reducer.

Any thoughts on that?
Thanks,

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
| www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel

<http://www.myheritage.com/>

<https://www.facebook.com/myheritage>
<https://twitter.com/myheritage>         <http://blog.myheritage.com/>
    <https://www.youtube.com/user/MyHeritageLtd>

Re: Kafka Streams - ordering grouped messages

Posted by Damian Guy <da...@gmail.com>.
Hi Ofir,

My advice it to handle the duplicates. As you said compaction only runs on
the non-active segments. There could be duplicates in the active segment.
Further, even after compaction has run there could still be duplicates.
You can attempt to minimize the occurrence of duplicates by adjusting the
segment size of the topic(s) in question. If you have a smaller segment
size then compaction will get a chance to run more frequently, however this
also means you'll have more files.

Thanks,
Damian

On Sun, 5 Mar 2017 at 10:24 Ofir Sharony <of...@myheritage.com>
wrote:

> Thanks guys,
>
> I would like to continue where we stopped (late arriving records):
>
> As I understand, the best practice to handle late arriving records is
> enabling Kafka log compaction, thus keeping only the latest record of a
> certain key.
> As log compaction starts to do its magic only on non-active segments, I'm
> trying to understand what's the best approach in case I want to send my
> data downstream in real time.
>
> Would you advise to plan my downstream apps to handle these key
> duplications, or there's any way to remove them in real time or close to it
> (let's say up to 1 minute)?
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> <http://www.myheritage.com/>
>
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
>
>
> On Wed, Mar 1, 2017 at 7:44 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Just wanted to add, that there is always the potential about late
> > arriving records, and thus, ordering by timestamp will never be
> perfect...
> >
> > You should rather try to design you application in a way such that it
> > can handle out-of-order data gracefully and try to avoid the necessity
> > of ordering records by timestamp.
> >
> >
> > -Matthias
> >
> > On 3/1/17 7:31 AM, Damian Guy wrote:
> > > You could implement your own based sorting algorithm using the low
> level
> > > processor api, i.e, you have a processor that keeps a sorted list of
> > > records and then, periodically, perhaps on punctuate, it emits the
> sorted
> > > messages downstream. You could do something like:
> > >
> > >     builder.stream("topic").transform(new TransformerSupplier() {
> > >
> > >         @Override
> > >         public Transformer get() {
> > >             return new TheTransformer();
> > >     }
> > > }).groupByKey().reduce(..);
> > >
> > > Where the TheTransformer might look something like:
> > >
> > > private static class TheTransformer<K, V, R> implements Transformer<K,
> > V, R> {
> > >     private ProcessorContext context;
> > >     private TreeMap<K, V> sorted = new TreeMap<>();
> > >
> > >     @Override
> > >     public void init(final ProcessorContext context) {
> > >         this.context = context;
> > >         context.schedule(1000); // punctuate every 1 second of
> > streams-time
> > >     }
> > >
> > >     @Override
> > >     public R transform(final K key, final V value) {
> > >         // do stuff
> > >         sorted.put(key, value);
> > >     }
> > >
> > >     @Override
> > >     public R punctuate(final long timestamp) {
> > >         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
> > >             context.forward(kvEntry.getKey(), kvEntry.getValue());
> > >         }
> > >         sorted.clear();
> > >         return null;
> > >     }
> > >
> > >     @Override
> > >     public void close() {
> > >
> > >     }
> > > }
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <of...@myheritage.com>
> > > wrote:
> > >
> > >> Is there any way to sort grouped records before sending them to the
> > >> reducer?
> > >>
> > >> *Ofir Sharony*
> > >> BackEnd Tech Lead
> > >>
> > >> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277> |
> > ofir.sharony@myheritage.com
> > >> | www.myheritage.com
> > >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >>
> > >> <http://www.myheritage.com/>
> > >>
> > >> <https://www.facebook.com/myheritage>
> > >> <https://twitter.com/myheritage>         <http://blog.myheritage.com/
> >
> > >>     <https://www.youtube.com/user/MyHeritageLtd>
> > >>
> > >>
> > >> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> The TimestampExtractor won't effect the order the records arrive in.
> It
> > >>> just provides a way for developers to use a timestamp other than the
> > >>> default.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <
> ofir.sharony@myheritage.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> I have the following code on a stream:
> > >>>>
> > >>>> .selectKey(...)
> > >>>> .groupByKey(...)
> > >>>> .reduce(...)
> > >>>>
> > >>>> The records arrived to the Reducer function in the same order they
> > were
> > >>>> consumed from Kafka
> > >>>> I have implemented a TimestampExtractor, extracting the wanted
> > >> timestamp
> > >>>> from each record, but unfortunately this didn't have any effect on
> the
> > >>>> order the messages were received in the Reducer.
> > >>>>
> > >>>> Any thoughts on that?
> > >>>> Thanks,
> > >>>>
> > >>>> *Ofir Sharony*
> > >>>> BackEnd Tech Lead
> > >>>>
> > >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> <+972%2054-756-0277>
> > >> <+972%2054-756-0277> |
> > >>>> ofir.sharony@myheritage.com
> > >>>> | www.myheritage.com
> > >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >>>>
> > >>>> <http://www.myheritage.com/>
> > >>>>
> > >>>> <https://www.facebook.com/myheritage>
> > >>>> <https://twitter.com/myheritage>         <
> http://blog.myheritage.com/
> > >
> > >>>>     <https://www.youtube.com/user/MyHeritageLtd>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Re: Kafka Streams - ordering grouped messages

Posted by Ofir Sharony <of...@myheritage.com>.
Thanks guys,

I would like to continue where we stopped (late arriving records):

As I understand, the best practice to handle late arriving records is
enabling Kafka log compaction, thus keeping only the latest record of a
certain key.
As log compaction starts to do its magic only on non-active segments, I'm
trying to understand what's the best approach in case I want to send my
data downstream in real time.

Would you advise to plan my downstream apps to handle these key
duplications, or there's any way to remove them in real time or close to it
(let's say up to 1 minute)?

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 | ofir.sharony@myheritage.com | www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel

<http://www.myheritage.com/>

<https://www.facebook.com/myheritage>
<https://twitter.com/myheritage>         <http://blog.myheritage.com/>
    <https://www.youtube.com/user/MyHeritageLtd>


On Wed, Mar 1, 2017 at 7:44 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Just wanted to add, that there is always the potential about late
> arriving records, and thus, ordering by timestamp will never be perfect...
>
> You should rather try to design you application in a way such that it
> can handle out-of-order data gracefully and try to avoid the necessity
> of ordering records by timestamp.
>
>
> -Matthias
>
> On 3/1/17 7:31 AM, Damian Guy wrote:
> > You could implement your own based sorting algorithm using the low level
> > processor api, i.e, you have a processor that keeps a sorted list of
> > records and then, periodically, perhaps on punctuate, it emits the sorted
> > messages downstream. You could do something like:
> >
> >     builder.stream("topic").transform(new TransformerSupplier() {
> >
> >         @Override
> >         public Transformer get() {
> >             return new TheTransformer();
> >     }
> > }).groupByKey().reduce(..);
> >
> > Where the TheTransformer might look something like:
> >
> > private static class TheTransformer<K, V, R> implements Transformer<K,
> V, R> {
> >     private ProcessorContext context;
> >     private TreeMap<K, V> sorted = new TreeMap<>();
> >
> >     @Override
> >     public void init(final ProcessorContext context) {
> >         this.context = context;
> >         context.schedule(1000); // punctuate every 1 second of
> streams-time
> >     }
> >
> >     @Override
> >     public R transform(final K key, final V value) {
> >         // do stuff
> >         sorted.put(key, value);
> >     }
> >
> >     @Override
> >     public R punctuate(final long timestamp) {
> >         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
> >             context.forward(kvEntry.getKey(), kvEntry.getValue());
> >         }
> >         sorted.clear();
> >         return null;
> >     }
> >
> >     @Override
> >     public void close() {
> >
> >     }
> > }
> >
> >
> >
> >
> >
> > On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <of...@myheritage.com>
> > wrote:
> >
> >> Is there any way to sort grouped records before sending them to the
> >> reducer?
> >>
> >> *Ofir Sharony*
> >> BackEnd Tech Lead
> >>
> >> Mobile: +972-54-7560277 <+972%2054-756-0277> |
> ofir.sharony@myheritage.com
> >> | www.myheritage.com
> >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >>
> >> <http://www.myheritage.com/>
> >>
> >> <https://www.facebook.com/myheritage>
> >> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> >>     <https://www.youtube.com/user/MyHeritageLtd>
> >>
> >>
> >> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <da...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> The TimestampExtractor won't effect the order the records arrive in. It
> >>> just provides a way for developers to use a timestamp other than the
> >>> default.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <of...@myheritage.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I have the following code on a stream:
> >>>>
> >>>> .selectKey(...)
> >>>> .groupByKey(...)
> >>>> .reduce(...)
> >>>>
> >>>> The records arrived to the Reducer function in the same order they
> were
> >>>> consumed from Kafka
> >>>> I have implemented a TimestampExtractor, extracting the wanted
> >> timestamp
> >>>> from each record, but unfortunately this didn't have any effect on the
> >>>> order the messages were received in the Reducer.
> >>>>
> >>>> Any thoughts on that?
> >>>> Thanks,
> >>>>
> >>>> *Ofir Sharony*
> >>>> BackEnd Tech Lead
> >>>>
> >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> >> <+972%2054-756-0277> |
> >>>> ofir.sharony@myheritage.com
> >>>> | www.myheritage.com
> >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >>>>
> >>>> <http://www.myheritage.com/>
> >>>>
> >>>> <https://www.facebook.com/myheritage>
> >>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/
> >
> >>>>     <https://www.youtube.com/user/MyHeritageLtd>
> >>>>
> >>>
> >>
> >
>
>

Re: Kafka Streams - ordering grouped messages

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Just wanted to add, that there is always the potential about late
arriving records, and thus, ordering by timestamp will never be perfect...

You should rather try to design you application in a way such that it
can handle out-of-order data gracefully and try to avoid the necessity
of ordering records by timestamp.


-Matthias

On 3/1/17 7:31 AM, Damian Guy wrote:
> You could implement your own based sorting algorithm using the low level
> processor api, i.e, you have a processor that keeps a sorted list of
> records and then, periodically, perhaps on punctuate, it emits the sorted
> messages downstream. You could do something like:
> 
>     builder.stream("topic").transform(new TransformerSupplier() {
> 
>         @Override
>         public Transformer get() {
>             return new TheTransformer();
>     }
> }).groupByKey().reduce(..);
> 
> Where the TheTransformer might look something like:
> 
> private static class TheTransformer<K, V, R> implements Transformer<K, V, R> {
>     private ProcessorContext context;
>     private TreeMap<K, V> sorted = new TreeMap<>();
> 
>     @Override
>     public void init(final ProcessorContext context) {
>         this.context = context;
>         context.schedule(1000); // punctuate every 1 second of streams-time
>     }
> 
>     @Override
>     public R transform(final K key, final V value) {
>         // do stuff
>         sorted.put(key, value);
>     }
> 
>     @Override
>     public R punctuate(final long timestamp) {
>         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
>             context.forward(kvEntry.getKey(), kvEntry.getValue());
>         }
>         sorted.clear();
>         return null;
>     }
> 
>     @Override
>     public void close() {
> 
>     }
> }
> 
> 
> 
> 
> 
> On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <of...@myheritage.com>
> wrote:
> 
>> Is there any way to sort grouped records before sending them to the
>> reducer?
>>
>> *Ofir Sharony*
>> BackEnd Tech Lead
>>
>> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
>> | www.myheritage.com
>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>
>> <http://www.myheritage.com/>
>>
>> <https://www.facebook.com/myheritage>
>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>     <https://www.youtube.com/user/MyHeritageLtd>
>>
>>
>> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <da...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The TimestampExtractor won't effect the order the records arrive in. It
>>> just provides a way for developers to use a timestamp other than the
>>> default.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <of...@myheritage.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have the following code on a stream:
>>>>
>>>> .selectKey(...)
>>>> .groupByKey(...)
>>>> .reduce(...)
>>>>
>>>> The records arrived to the Reducer function in the same order they were
>>>> consumed from Kafka
>>>> I have implemented a TimestampExtractor, extracting the wanted
>> timestamp
>>>> from each record, but unfortunately this didn't have any effect on the
>>>> order the messages were received in the Reducer.
>>>>
>>>> Any thoughts on that?
>>>> Thanks,
>>>>
>>>> *Ofir Sharony*
>>>> BackEnd Tech Lead
>>>>
>>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
>> <+972%2054-756-0277> |
>>>> ofir.sharony@myheritage.com
>>>> | www.myheritage.com
>>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>>>
>>>> <http://www.myheritage.com/>
>>>>
>>>> <https://www.facebook.com/myheritage>
>>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>>>     <https://www.youtube.com/user/MyHeritageLtd>
>>>>
>>>
>>
> 


Re: Kafka Streams - ordering grouped messages

Posted by Damian Guy <da...@gmail.com>.
You could implement your own based sorting algorithm using the low level
processor api, i.e, you have a processor that keeps a sorted list of
records and then, periodically, perhaps on punctuate, it emits the sorted
messages downstream. You could do something like:

    builder.stream("topic").transform(new TransformerSupplier() {

        @Override
        public Transformer get() {
            return new TheTransformer();
    }
}).groupByKey().reduce(..);

Where the TheTransformer might look something like:

private static class TheTransformer<K, V, R> implements Transformer<K, V, R> {
    private ProcessorContext context;
    private TreeMap<K, V> sorted = new TreeMap<>();

    @Override
    public void init(final ProcessorContext context) {
        this.context = context;
        context.schedule(1000); // punctuate every 1 second of streams-time
    }

    @Override
    public R transform(final K key, final V value) {
        // do stuff
        sorted.put(key, value);
    }

    @Override
    public R punctuate(final long timestamp) {
        for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
            context.forward(kvEntry.getKey(), kvEntry.getValue());
        }
        sorted.clear();
        return null;
    }

    @Override
    public void close() {

    }
}





On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <of...@myheritage.com>
wrote:

> Is there any way to sort grouped records before sending them to the
> reducer?
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> <http://www.myheritage.com/>
>
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
>
>
> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi,
> >
> > The TimestampExtractor won't effect the order the records arrive in. It
> > just provides a way for developers to use a timestamp other than the
> > default.
> >
> > Thanks,
> > Damian
> >
> > On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <of...@myheritage.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have the following code on a stream:
> > >
> > > .selectKey(...)
> > > .groupByKey(...)
> > > .reduce(...)
> > >
> > > The records arrived to the Reducer function in the same order they were
> > > consumed from Kafka
> > > I have implemented a TimestampExtractor, extracting the wanted
> timestamp
> > > from each record, but unfortunately this didn't have any effect on the
> > > order the messages were received in the Reducer.
> > >
> > > Any thoughts on that?
> > > Thanks,
> > >
> > > *Ofir Sharony*
> > > BackEnd Tech Lead
> > >
> > > Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> <+972%2054-756-0277> |
> > > ofir.sharony@myheritage.com
> > > | www.myheritage.com
> > > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >
> > > <http://www.myheritage.com/>
> > >
> > > <https://www.facebook.com/myheritage>
> > > <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> > >     <https://www.youtube.com/user/MyHeritageLtd>
> > >
> >
>

Re: Kafka Streams - ordering grouped messages

Posted by Ofir Sharony <of...@myheritage.com>.
Is there any way to sort grouped records before sending them to the reducer?

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 | ofir.sharony@myheritage.com | www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel

<http://www.myheritage.com/>

<https://www.facebook.com/myheritage>
<https://twitter.com/myheritage>         <http://blog.myheritage.com/>
    <https://www.youtube.com/user/MyHeritageLtd>


On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> The TimestampExtractor won't effect the order the records arrive in. It
> just provides a way for developers to use a timestamp other than the
> default.
>
> Thanks,
> Damian
>
> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <of...@myheritage.com>
> wrote:
>
> > Hi,
> >
> > I have the following code on a stream:
> >
> > .selectKey(...)
> > .groupByKey(...)
> > .reduce(...)
> >
> > The records arrived to the Reducer function in the same order they were
> > consumed from Kafka
> > I have implemented a TimestampExtractor, extracting the wanted timestamp
> > from each record, but unfortunately this didn't have any effect on the
> > order the messages were received in the Reducer.
> >
> > Any thoughts on that?
> > Thanks,
> >
> > *Ofir Sharony*
> > BackEnd Tech Lead
> >
> > Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277> |
> > ofir.sharony@myheritage.com
> > | www.myheritage.com
> > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >
> > <http://www.myheritage.com/>
> >
> > <https://www.facebook.com/myheritage>
> > <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> >     <https://www.youtube.com/user/MyHeritageLtd>
> >
>

Re: Kafka Streams - ordering grouped messages

Posted by Damian Guy <da...@gmail.com>.
Hi,

The TimestampExtractor won't effect the order the records arrive in. It
just provides a way for developers to use a timestamp other than the
default.

Thanks,
Damian

On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <of...@myheritage.com>
wrote:

> Hi,
>
> I have the following code on a stream:
>
> .selectKey(...)
> .groupByKey(...)
> .reduce(...)
>
> The records arrived to the Reducer function in the same order they were
> consumed from Kafka
> I have implemented a TimestampExtractor, extracting the wanted timestamp
> from each record, but unfortunately this didn't have any effect on the
> order the messages were received in the Reducer.
>
> Any thoughts on that?
> Thanks,
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277> |
> ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> <http://www.myheritage.com/>
>
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
>