You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2021/07/02 14:37:00 UTC

[jira] [Comment Edited] (KAFKA-8295) Optimize count() using RocksDB merge operator

    [ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373572#comment-17373572 ] 

Sagar Rao edited comment on KAFKA-8295 at 7/2/21, 2:36 PM:
-----------------------------------------------------------

[~ableegoldman], I did some more analysis and here's a summary of it:

1) the RocksDB merge operator is useful for both associative and partial merges. The examples as per the documentation is ranging from a simple counter, to appending values to a string/list or partial updates in a json document.

2) Here's where I feel it could be useful in Kafka Streams context:

2.1) One option could be we introduce a merge operator in StateStores interface. This merge operator could be used for incrementing a counter or appending value to an existing String/List. It could be useful as all the operations i described above are RMW operations. As per RocksDB documentation, random Get() is relatively slow, so that might be one advantage. Infact, this could be easily extended to in memory state stores as well so we aren't tied to Rockdb here.

The only downside I see is that looks like we can set only one mergeOperator to the options. So, we might want to set the correct MergerOperator based upon the values.

If you think such an operator is useful, I can do a benchmarking exercise by adding an interface and comparing the performance of the same counter example by RMW and merge() operator.

2) I also looked at the count() api in KGroupedStream and one of its implementation. As per my understanding, eventually, the logic resides in KStreamAggregate.process() method where it's still a RMW sequence:
{code:java}
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
 T oldAgg = getValueOrNull(oldAggAndTimestamp);
final T newAgg;
 final long newTimestamp;
if (oldAgg == null)
{ oldAgg = initializer.apply(); newTimestamp = context().timestamp(); }
else
{ oldAgg = oldAggAndTimestamp.value(); newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); }
newAgg = aggregator.apply(key, value, oldAgg);
{code}

 
 and the aggregator is a countAggregator:
{code:java}
final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1;
{code}
the merge API in C++ supports both an initial value when it and updating an existing value. The initial value is being handled here:
{code:java}
final Initializer<Long> countInitializer = () -> 0L
{code}
 

So, in theory this sequence could also be replaced but I will have to try it out. One thing to note is that, the value is ValueTimeStamp which I believe is not supported in the UInt64AddOperator. So, we might want a custom merge operator in c++ and then add it here and I am not sure of the performance :)

I can give it a try if needed.

WDYT? Option 1 is simpler so should we go ahead with that and if we get good results we can try option 2?


was (Author: sagarrao):
[~ableegoldman], I did some more analysis and here's a summary of it:

1) the RocksDB merge operator is useful for both associative and partial merges. The examples as per the documentation is ranging from a simple counter, to appending values to a string/list or partial updates in a json document.

2) Here's where I feel it could be useful in Kafka Streams context:

2.1) One option could be we introduce a merge operator in StateStores interface. This merge operator could be used for incrementing a counter or appending value to an existing String/List. It could be useful as all the operations i described above are RMW operations. As per RocksDB documentation, random Get() is relatively slow, so that might be one advantage. Infact, this could be easily extended to in memory state stores as well so we aren't tied to Rockdb here.

The only downside I see is that looks like we can set only one mergeOperator to the options. So, we might want to set the correct MergerOperator based upon the values.

If you think such an operator is useful, I can do a benchmarking exercise by adding an interface and comparing the performance of the same counter example by RMW and merge() operator.

2) I also looked at the count() api in KGroupedStream and one of its implementation. As per my understanding, eventually, the logic resides in KStreamAggregate.process() method where it's still a RMW sequence:

```
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
            T oldAgg = getValueOrNull(oldAggAndTimestamp);

            final T newAgg;
            final long newTimestamp;

            if (oldAgg == null) {
                oldAgg = initializer.apply();
                newTimestamp = context().timestamp();
            } else {
                oldAgg = oldAggAndTimestamp.value();
                newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
            }

            newAgg = aggregator.apply(key, value, oldAgg);
```
and the aggregator is a countAggregator:

`final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1;` 

the merge API in C++ supports both an initial value when it and updating an existing value. The initial value is being handled here:

`final Initializer<Long> countInitializer = () -> 0L;` 

So, in theory this sequence could also be replaced but I will have to try it out. One thing to note is that, the value is ValueTimeStamp which I believe is not supported in the UInt64AddOperator. So, we might want a custom merge operator in c++ and then add it here and I am not sure of the performance :) 

I can give it a try if needed.

WDYT? Option 1 is simpler so should we go ahead with that and if we get good results we can try option 2?







> Optimize count() using RocksDB merge operator
> ---------------------------------------------
>
>                 Key: KAFKA-8295
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8295
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: Sagar Rao
>            Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, merge. This essentially provides an optimized read/update/write path in a single operation. One of the built-in (C++) merge operators exposed over the Java API is a counter. We should be able to leverage this for a more efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general aggregations, even if RocksJava allowed for a custom merge operator, unless we provide a way for the user to specify and connect a C++ implemented aggregator – otherwise we incur too much cost crossing the jni for a net performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)