You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2021/05/23 18:55:00 UTC

[jira] [Comment Edited] (KAFKA-8295) Optimize count() using RocksDB merge operator

    [ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350117#comment-17350117 ] 

Sagar Rao edited comment on KAFKA-8295 at 5/23/21, 6:54 PM:
------------------------------------------------------------

[~ableegoldman],  So, I extended the rocksjava benchmarking code available on github([https://github.com/facebook/rocksdb/wiki/RocksJava-Performance-on-Flash-Storage)]  and added a MergeRandomTask to it. The operator that I have benchmarked is UInt64AddOperator. Here is the benchmarking task that I added: [https://github.com/vamossagar12/rocksdb/blob/master/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java#L310-L346] (hope you can view my forked repo?)

 

This is how plugged in the mergeOperator:

[https://github.com/vamossagar12/rocksdb/blob/master/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java#L645-L656]

 

I ran the tests on the following machine config:

*32 core, 256 GB RAM*

*Centos: 7.7* 

*DB created on Disk attached to the VM. Azure IOPS2 Volume Type => 7000.*

Here are the config parameters passed:

 

*bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; mbc=20; r=50000000; t=10; vs=8; bs=65536; si=1000000; time ./jdb_bench.sh --benchmarks=mergerandom --merge_operator=uint64add --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --db=/data/sdb --disable_wal=1 --stats_interval=$si --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --stats_per_interval=1 --max_bytes_for_level_base=$bpl*

 

The configs above, i picked from the rocksdb cpp merge operator benchmarking page from here:

[https://github.com/facebook/rocksdb/wiki/Read-Modify-Write-Benchmarks] 

Note that, snappy compression didn't work as i couldn't link the library. i dont think that should be a deterrent anyways for the benchmarking. Finally, here are the numbers that I got:

####################################################################

*Running benchmark in 64-Bit mode.*

*Unable to load snappy library:java.lang.UnsatisfiedLinkError: no snappy in java.library.path*

*No compression is used.*

*Using database directory: /data/sdb*

*Keys:     16 bytes each*

*Values:   8 bytes each (4 bytes after compression)*

*Entries:  50000000*

*RawSize:  1144.4 MB (estimated)*

*FileSize:   953.7 MB (estimated)*

*Memtable Factory: SkipListFactory*

*Prefix:   0 bytes*

*Compression: none*

*------------------------------------------------*

*mergerandom      :     5.17497 micros/op;    4.4 MB/s; 50000000 ops done;  1 / 1 task(s) finished.*

 ** 

*real 4m19.249s*

*user 16m18.119s*

*sys 0m17.834s*

####################################################################

What do you make of these numbers? Wanted to understand any particular numbers that are traditionally looked at in the Kafka streams landscape to be able to decide? Or any particular configs? 

I should be able to factor those in and republish the numbers. Plz let me know.

We can then decide later on based upon your feedback if the operator is worthy of a KIP or not. 

 


was (Author: sagarrao):
[~ableegoldman],  So, I extended the rocksjava benchmarking code available on github([https://github.com/facebook/rocksdb/wiki/RocksJava-Performance-on-Flash-Storage)]  and added a MergeRandomTask to it. The operator that I have benchmarked is UInt64AddOperator. Here is the benchmarking task that I added: [https://github.com/vamossagar12/rocksdb/blob/master/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java#L310-L346] (hope you can view my forked repo?)

 

This is how plugged in the mergeOperator:

[https://github.com/vamossagar12/rocksdb/blob/master/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java#L645-L656]

 

I ran the tests on the following machine config:

*32 core, 256 GB RAM*

*Centos: 7.7* 

*DB created on Disk attached to the VM. Azure IOPS2 Volume Type => 7000.*

Here are the config parameters passed:

 

*bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; mbc=20; r=50000000; t=10; vs=8; bs=65536; si=1000000; time ./jdb_bench.sh --benchmarks=mergerandom --merge_operator=uint64add --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --db=/data/sdb --disable_wal=1 --stats_interval=$si --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --stats_per_interval=1 --max_bytes_for_level_base=$bpl*

 

The configs above, i picked from the rocksdb cpp merge operator benchmarking page from here:

https://github.com/facebook/rocksdb/wiki/Read-Modify-Write-Benchmarks 

Note that, snappy compression didn't work as i couldn't link the library. i dont think that should be a deterrent anyways for the benchmarking. Finally, here are the numbers that I got:

####################################################################

*Running benchmark in 64-Bit mode.*

*Unable to load snappy library:java.lang.UnsatisfiedLinkError: no snappy in java.library.path*

*No compression is used.*

*Using database directory: /data/sdb*

*Keys:     16 bytes each*

*Values:   8 bytes each (4 bytes after compression)*

*Entries:  50000000*

*RawSize:  1144.4 MB (estimated)*

*FileSize:   953.7 MB (estimated)*

*Memtable Factory: SkipListFactory*

*Prefix:   0 bytes*

*Compression: none*

*------------------------------------------------*

*mergerandom      :     5.17497 micros/op;    4.4 MB/s; 50000000 ops done;  1 / 1 task(s) finished.*

 ** 

*real 4m19.249s*

*user 16m18.119s*

*sys 0m17.834s*

####################################################################

What do you make of these numbers? Wanted to understand any particular numbers that are traditionally looked at in the Kafka streams landscape to be able to decide? Or any particular configs? 

I should be able to factor those in and republish the numbers. Plz let me know.

 

 

> Optimize count() using RocksDB merge operator
> ---------------------------------------------
>
>                 Key: KAFKA-8295
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8295
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: Sagar Rao
>            Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, merge. This essentially provides an optimized read/update/write path in a single operation. One of the built-in (C++) merge operators exposed over the Java API is a counter. We should be able to leverage this for a more efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general aggregations, even if RocksJava allowed for a custom merge operator, unless we provide a way for the user to specify and connect a C++ implemented aggregator – otherwise we incur too much cost crossing the jni for a net performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)