You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mathieu Fenniak <ma...@replicon.com> on 2016/12/04 04:13:55 UTC

Suppressing redundant KTable forwards

Hey all,

I'd like to contribute a new KTable API that would allow for the
suppression of redundant KTable forwards, and I'd like to solicit feedback
before I put together a patch.

A typical use-case of this API would be that you're using mapValues to
pluck a subset of data out of a topic, but you'd like changes to the record
value that don't modify the output of mapValues to not cause output that
trigger expensive and redundant recalculations.

For example, topic "user" contains key:1, value:{"firstName": "Jack",
"lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
user.get("lastName"))  will create a KTable that would forward updates from
the user topic even if lastName never changed.

My proposed API would be to add a filterRedundant method to KTable; one
override takes a Comparator<V> to provide a custom comparison for
evaluating whether a change is redundant, and one parameterless override
would use a comparator backed by the object's equals() method.

    /**
     * Creates a new instance of {@link KTable} that filters out redundant
updates and prevents "non-updates" from
     * propagating to further operations on the returned table.  A
redundant update onewhere the same value is provided
     * more than once for a given key.  Object.equals is used to compare
whether a subsequent update has the same value.

     * @return a {@link KTable} that contains the same values as this
table, but suppresses redundant updates
     */
    KTable<K, V> filterRedundant();

    /**
     * Creates a new instance of {@link KTable} that filters out redundant
updates and prevents "non-updates" from
     * propagating to further operations on the returned table.  A
redundant update onewhere the same value is provided
     * more than once for a given key.  A user-provided comparator is used
to compare whether a subsequent update has
     * the same value.

     * @return a {@link KTable} that contains the same values as this
table, but suppresses redundant updates
     */
    KTable<K, V> filterRedundant(Comparator<V> comparator);

Re: Suppressing redundant KTable forwards

Posted by Damian Guy <da...@gmail.com>.
Hi Mathieu,

You are correct in that the de-duping only occurs within the commit
interval.

I can understand and appreciate the use-case you have. So I think the right
approach for this is to create a KIP with your suggested changes and put it
to the community. Are you happy to do that?

Thanks,
Damian

On Sun, 4 Dec 2016 at 15:20 Mathieu Fenniak <ma...@replicon.com>
wrote:

> Hi Eno,
>
> I'm not sure.  My understanding is that the cache would prevent two
> immediate updates for the same key from being forwarded, but that only
> applies when records arrive within commit.interval.ms of each other.  Is
> that understanding correct?
>
> filterRedundant compares the newValue & oldValue in a Change to work
> regardless of the time between records.
>
> https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant
>
>
> The use-case that is currently kicking me is a piece of source data that
> contains multiple unrelated configuration fields in a single record; it's
> not a great design, but it's the data I have to work with.  I'm plucking
> out only a single relevant field with mapValues, but changes to the other
> fields within the record are causing excessive, expensive recomputations
> that are redundant.
>
> Mathieu
>
>
> On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi Mathieu,
> >
> > Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do
> some
> > of this for you, in that it dedups records with the same key and prevents
> > them from being forwarded downstream?
> >
> > Eno
> > > On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenniak@replicon.com
> >
> > wrote:
> > >
> > > Hey all,
> > >
> > > I'd like to contribute a new KTable API that would allow for the
> > > suppression of redundant KTable forwards, and I'd like to solicit
> > feedback
> > > before I put together a patch.
> > >
> > > A typical use-case of this API would be that you're using mapValues to
> > > pluck a subset of data out of a topic, but you'd like changes to the
> > record
> > > value that don't modify the output of mapValues to not cause output
> that
> > > trigger expensive and redundant recalculations.
> > >
> > > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > > user.get("lastName"))  will create a KTable that would forward updates
> > from
> > > the user topic even if lastName never changed.
> > >
> > > My proposed API would be to add a filterRedundant method to KTable; one
> > > override takes a Comparator<V> to provide a custom comparison for
> > > evaluating whether a change is redundant, and one parameterless
> override
> > > would use a comparator backed by the object's equals() method.
> > >
> > >    /**
> > >     * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > >     * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > >     * more than once for a given key.  Object.equals is used to compare
> > > whether a subsequent update has the same value.
> > >
> > >     * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > >     */
> > >    KTable<K, V> filterRedundant();
> > >
> > >    /**
> > >     * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > >     * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > >     * more than once for a given key.  A user-provided comparator is
> used
> > > to compare whether a subsequent update has
> > >     * the same value.
> > >
> > >     * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > >     */
> > >    KTable<K, V> filterRedundant(Comparator<V> comparator);
> >
> >
>

Re: Suppressing redundant KTable forwards

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Eno,

I'm not sure.  My understanding is that the cache would prevent two
immediate updates for the same key from being forwarded, but that only
applies when records arrive within commit.interval.ms of each other.  Is
that understanding correct?

filterRedundant compares the newValue & oldValue in a Change to work
regardless of the time between records.

https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant


The use-case that is currently kicking me is a piece of source data that
contains multiple unrelated configuration fields in a single record; it's
not a great design, but it's the data I have to work with.  I'm plucking
out only a single relevant field with mapValues, but changes to the other
fields within the record are causing excessive, expensive recomputations
that are redundant.

Mathieu


On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska <en...@gmail.com> wrote:

> Hi Mathieu,
>
> Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some
> of this for you, in that it dedups records with the same key and prevents
> them from being forwarded downstream?
>
> Eno
> > On 4 Dec 2016, at 04:13, Mathieu Fenniak <ma...@replicon.com>
> wrote:
> >
> > Hey all,
> >
> > I'd like to contribute a new KTable API that would allow for the
> > suppression of redundant KTable forwards, and I'd like to solicit
> feedback
> > before I put together a patch.
> >
> > A typical use-case of this API would be that you're using mapValues to
> > pluck a subset of data out of a topic, but you'd like changes to the
> record
> > value that don't modify the output of mapValues to not cause output that
> > trigger expensive and redundant recalculations.
> >
> > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > user.get("lastName"))  will create a KTable that would forward updates
> from
> > the user topic even if lastName never changed.
> >
> > My proposed API would be to add a filterRedundant method to KTable; one
> > override takes a Comparator<V> to provide a custom comparison for
> > evaluating whether a change is redundant, and one parameterless override
> > would use a comparator backed by the object's equals() method.
> >
> >    /**
> >     * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> >     * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> >     * more than once for a given key.  Object.equals is used to compare
> > whether a subsequent update has the same value.
> >
> >     * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> >     */
> >    KTable<K, V> filterRedundant();
> >
> >    /**
> >     * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> >     * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> >     * more than once for a given key.  A user-provided comparator is used
> > to compare whether a subsequent update has
> >     * the same value.
> >
> >     * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> >     */
> >    KTable<K, V> filterRedundant(Comparator<V> comparator);
>
>

Re: Suppressing redundant KTable forwards

Posted by Eno Thereska <en...@gmail.com>.
Hi Mathieu,

Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some of this for you, in that it dedups records with the same key and prevents them from being forwarded downstream?

Eno
> On 4 Dec 2016, at 04:13, Mathieu Fenniak <ma...@replicon.com> wrote:
> 
> Hey all,
> 
> I'd like to contribute a new KTable API that would allow for the
> suppression of redundant KTable forwards, and I'd like to solicit feedback
> before I put together a patch.
> 
> A typical use-case of this API would be that you're using mapValues to
> pluck a subset of data out of a topic, but you'd like changes to the record
> value that don't modify the output of mapValues to not cause output that
> trigger expensive and redundant recalculations.
> 
> For example, topic "user" contains key:1, value:{"firstName": "Jack",
> "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> user.get("lastName"))  will create a KTable that would forward updates from
> the user topic even if lastName never changed.
> 
> My proposed API would be to add a filterRedundant method to KTable; one
> override takes a Comparator<V> to provide a custom comparison for
> evaluating whether a change is redundant, and one parameterless override
> would use a comparator backed by the object's equals() method.
> 
>    /**
>     * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
>     * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
>     * more than once for a given key.  Object.equals is used to compare
> whether a subsequent update has the same value.
> 
>     * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
>     */
>    KTable<K, V> filterRedundant();
> 
>    /**
>     * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
>     * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
>     * more than once for a given key.  A user-provided comparator is used
> to compare whether a subsequent update has
>     * the same value.
> 
>     * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
>     */
>    KTable<K, V> filterRedundant(Comparator<V> comparator);