You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/31 14:43:25 UTC
[GitHub] [kafka] vamossagar12 opened a new pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 opened a new pull request #10798:
URL: https://github.com/apache/kafka/pull/10798
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703333661
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
> @vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.). You can keep queues of free and used buffers and move them around during state store operations.
Thanks @patrickstuedi , that's a great suggestion. I think it makes sense given the expense in allocation/disposal. Couple of questions there:
1) I think the factor to consider there would be how to manage the pool of used/free buffers? As already pointed by @cadonna above, the only case where we can expect concurrent access to state stores would be for IQ and otherwise it's single threaded. For the single threaded case, it's going to be straight forward but for IQ, what happens if a request comes in and none of the direct byte buffers are `free` ? Do we re-allocate on the fly?
2) This might be a small issue but calling it out. We need to allocate some capacity to the direct byte buffer. Currently, I am iterating over the list of keys and values and assigning the max length- which again mayn't be the most efficient way. If I were to create the pool, then I would need to know the capacity to allocate upfront. Can this be exposed as a config for the users in that case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 edited a comment on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035
@guozhangwang / @cadonna I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries around .3 ops/s and .15 ops/s respectively:
Here is the compaision:
```
testPersistentRangeQueryPerformance original
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.131 ? 0.028 ops/s
testPersistentPutAllPerformance original
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 0.919 ? 0.037 ops/s
testPersistentRangeQueryPerformance bytebuffer
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.442 ? 0.038 ops/s
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 1.065 ? 0.041 ops/s
```
I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch.
Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644552319
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I had the same thought as @guozhangwang, but I am also not familiar with direct buffers.
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
The [javadocs](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html) say
> The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers.
So it seems allocating a direct buffer during each put might decrease performance. The internal benchmarks we run on this PR did not show any increase in throughput, but rather a decrease. However, I do not think the decrease was significant, so the cause could also just be a bad day of the environment. Anyways, continuously allocating a direct buffer does not seem to be a good idea, as the javadocs also say:
> It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
Another thing to consider:
> The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious.
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
The only case I can think of that might have high concurrency on RocksDB state store is with interactive queries. Without interactive queries there is no concurrency on the state stores since only the stream thread that has assigned the stateful task owning the state store accesses the state store.
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I think you should try to benchmarks putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes.
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I think you should try to experiment with putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes. I am curious if we will also get such improvements.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-917118246
Hi @vamossagar12 Thanks again for your patience! I know it has been dragging a bit long, and I had some discussions with @cadonna and @vamossagar12 regarding how we can move forward to eventually merge-in your work to Apache Kafka. As a result of that I created this ticket trying to summarize what we want to do (which would incorporate this JIRA ticket):
https://issues.apache.org/jira/browse/KAFKA-13286
At the moment, our main concerns are that:
1) Direct bytebuffers, which is relied on native memory, is not fully managed by the JVM (GC) and hence we need to be very careful about its side-effects regarding memory pressures.
2) The current implementation without the API changes on serdes has a relatively small improvement on throughput, whereas in order to use the byte-buffer without API changes, we also would need to "guess" the allocated size before serialization.
So we think it's probably better to consider tackling the larger scope of https://issues.apache.org/jira/browse/KAFKA-13286 as a whole, which would mean we would hold on merging this PR after we have the serde / byte-buffer supplier works in place. Does that sound reasonable to you? Would love to hear your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035
@guozhangwang / @cadonna I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries:
Here is the compaision:
```
testPersistentRangeQueryPerformance original
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.131 ? 0.028 ops/s
testPersistentPutAllPerformance original
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 0.919 ? 0.037 ops/s
testPersistentRangeQueryPerformance bytebuffer
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 1.442 ? 0.038 ops/s
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 1.065 ? 0.041 ops/s
```
I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch.
Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703312659
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-876082875
@guozhangwang , @cadonna did you get a chance to look at the numbers? Any tweaks/other tests you would suggest ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703469853
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
I see. I think barring Interactive Queries, the other accesses are single threaded.
Regarding the max-size, TBH i am not aware.
Also, regarding changing the API, yeah rocksdb store isn't exposed so the new apis will have to bein KVStore with default implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-903313112
Sorry @vamossagar12 for the late reply.. I took a look at the current PR and it looks good to me. The throughput numbers seems promising as well. Could you clean up the PR a bit while I will ping @cadonna for another look at the PR as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644699017
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I think you should try to experiment with putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes. I am curious if we will also get such improvements.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-887703246
hey @guozhangwang / @cadonna .. sorry for being nosey here but did you get a chance to look at these numbers?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
> Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...
Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP.
Infact, we could even go one step ahead and add support for only DirectByteBuffers if that makes sense. We can throw errors if the ByteBuffer isn't direct for example- similar to the rocksdb implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644356441
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
I'm not very familiar with the direct buffer usage pattern, but currently it seems we would still try to allocate a new buffer for each put call, whereas I "thought" the main benefits come from reusing the buffer across multiple put calls? @vamossagar12 @ableegoldman @cadonna please correct me if I'm wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-910510701
Tests failed on compilation due to checkstyle warnings:
```
[2021-08-28T18:29:04.088Z] > Task :streams:streams-scala:compileTestScala
[2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala:19: imported `Named` is permanently hidden by definition of type Named in package kstream
[2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:24: imported `Named` is permanently hidden by definition of type Named in package kstream
[2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala:21: imported `Named` is permanently hidden by definition of type Named in package kstream
[2021-08-28T18:29:05.982Z]
[2021-08-28T18:29:05.982Z] > Task :streams:checkstyleTest
[2021-08-28T18:29:11.170Z]
[2021-08-28T18:29:11.170Z] > Task :streams:streams-scala:compileTestScala
[2021-08-28T18:29:11.170Z] three warnings found
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-911592247
@guozhangwang , these errors are not due to the changes in this PR:
`imported `Named` is permanently hidden by definition of type Named in package kstream`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
> Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...
Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644616944
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Thank you @guozhangwang , @cadonna . I think creating it every time does not make much sense. I should have been more careful before adding and asking for the internal benchmarks. In that case, would it even make sense to have it in an API like put and instead use it for putAll()/range/reverseRange/prefixSeek operations?
That's because in the case of put, it is difficult to know how many put operations may be requested. If users were using the rocksdb library directly, then they can create DirectByteBuffers once and push as many entries as they want.
Based upon my conversations with the rocksdb one of the comments was this:
`Extracting large amounts of data under high concurrency, non-direct byte buffer will bring serious GC problems to the upper level Java services.`
i guess, we can target those APIs? WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-1027802506
@vamossagar12 Can we close this PR and the corresponding ticket since it seems we decided to look for a more complete solution?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915194995
> > Okay, let me re-trigger the tests.
>
> Thanks.. This time there's a compilation error due to benchmarks. I will remove that class.
@guozhangwang , i removed that. class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294
You were referring to this commit https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb as for the tweaks right?
BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644616944
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Thank you @guozhangwang , @cadonna . I think creating it every time does not make much sense. I should have been more careful before adding and asking for the internal benchmarks. In that case, would it even make sense to have it in an API like put and instead use it for putAll()/range/reverseRange/prefixSeek operations?
That's because in the case of put, it is difficult to know how many put operations may be requested. If users were using the rocksdb library directly, then they can create DirectByteBuffers once and push as many entries as they want.
Based upon my conversations with the rocksdb one of the comments was this:
`Extracting large amounts of data under high concurrency, non-direct byte buffer will bring serious GC problems to the upper level Java services.`
i guess, we can target those APIs? WDYT?
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
ok. That makes sense. High concurrency is one of the cases where this might be useful. Having said that, on the PR, there are benchmarking numbers for a large number of put operations in a single threaded manner. As per the numbers direct byte buffer was 37% faster and with 0 GC cycles.
Here is the comment: https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
The users of kafka streams might call put() in this manner where in they loop through a bunch of records and use put() to insert. From the state store side, either we create 1 DirectByteBuffer object for put() and keep reusing it- subject to testing. But that might not always be the case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-872398600
@guozhangwang , thats the commit i was referring to. Setting a byte order and seems to have an impact on performance. Also, earlier i was calling put first and then calling flip which is wrong. So, I tweaked that.
I have another update on this piece. I created a small kafka streams app which subscribes to a single partition topic and for every record consumed, it has 2 processors: 1) putAll processor which ingests 1M records into state store and then 2). range processor which does a range query for all keys inserted.
I pushed 100 records to the topic and these 2 processors got invoked each time. Here are the numbers from that run:
```
DirectByteBuffer ->
Operator: putAll, Real: 80.722 CPU: 273.690 GC: 6.974 GCCount: 36 avg throughput: 1269130.717 op/s p95 throughput: 1430952.065 op/s p99 throughput: 1453862.308 op/s
Operator: range, Real: 114.511 CPU: 271.410 GC: 5.501 GCCount: 26 avg throughput: 920462.473 op/s p95 throughput: 1234215.537 op/s p99 throughput: 1588364.216 op/s
```
```
Original ->
Operator: putAll, Real: 92.988 CPU: 288.550 GC: 6.900 GCCount: 53 avg throughput: 1088404.000 op/s p95 throughput: 1233248.141 op/s p99 throughput: 1252514.840 op/s
Operator: range, Real: 110.418 CPU: 268.070 GC: 6.105 GCCount: 40 avg throughput: 957558.135 op/s p95 throughput: 1277487.581 op/s p99 throughput: 1388059.720 op/s
```
The way I calculate throughput is for every iteration, I divided 1M by time taken for that iteration. You can find the relevant code here:
https://github.com/vamossagar12/kafka-streams-throughputbenchmarking/tree/master/src/main/java/com/bytebyffer/benchmarks
Let me know if you and @cadonna think we need other. kinds of benchmarks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915026729
> Okay, let me re-trigger the tests.
Thanks.. This time there's a compilation error due to benchmarks. I will remove that class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r705268638
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -32,34 +34,49 @@
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;
+ private final ByteBuffer directByteBuffer;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes from,
final Bytes to,
final boolean forward,
- final boolean toInclusive) {
+ final boolean toInclusive,
+ final ByteBuffer directByteBuffer) {
super(storeName, iter, openIterators, forward);
this.forward = forward;
this.toInclusive = toInclusive;
+ this.directByteBuffer = directByteBuffer;
if (forward) {
if (from == null) {
iter.seekToFirst();
} else {
- iter.seek(from.get());
+ allocateDirectByteBufferAndSeek(iter, from, true);
}
rawLastKey = to == null ? null : to.get();
} else {
if (to == null) {
iter.seekToLast();
} else {
- iter.seekForPrev(to.get());
+ allocateDirectByteBufferAndSeek(iter, to, false);
}
rawLastKey = from == null ? null : from.get();
}
}
+ private void allocateDirectByteBufferAndSeek(final RocksIterator iter, final Bytes bytes, final boolean forward) {
+ /*final ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(bytes.get().length);
+ directByteBuffer.order(ByteOrder.nativeOrder());*/
+ directByteBuffer.clear();
+ directByteBuffer.put(bytes.get());
+ directByteBuffer.flip();
Review comment:
You might want to pull this up or into a separate method to avoid checking the direction twice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644552319
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I had the same thought as @guozhangwang, but I am also not familiar with direct buffers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 closed pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 closed pull request #10798:
URL: https://github.com/apache/kafka/pull/10798
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r704376366
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
@patrickstuedi , I resorted to creating only 1 instance of DirectByteBuffer(one for putAll keys and values and one for range. I have checked in that code for reference. However, I don't see much of a difference in terms of throughput numbers. I ran it on the same setup(pushing 100 messages on a single partition topic and for each message, push 1M keys using putAll and read through range() call).
I did 3 runs per approach (original v/s ByteBuffer) Here are the numbers:
```
ByteBuffer
Operator: putAll, Real: 87.206 CPU: 371.460 GC: 11.048 GCCount: 30 avg throughput: 1206503.194 op/s p95 throughput: 1423370.427 op/s p99 throughput: 1437098.797 op/s
Operator: range, Real: 115.121 CPU: 328.090 GC: 7.925 GCCount: 20 avg throughput: 923332.052 op/s p95 throughput: 1261332.193 op/s p99 throughput: 1308671.917 op/s
Operator: putAll, Real: 87.274 CPU: 382.920 GC: 11.477 GCCount: 29 avg throughput: 1211901.123 op/s p95 throughput: 1432304.571 op/s p99 throughput: 1447671.672 op/s
Operator: range, Real: 116.886 CPU: 335.610 GC: 8.230 GCCount: 21 avg throughput: 911159.182 op/s p95 throughput: 1234183.861 op/s p99 throughput: 1255574.938 op/s
putAll, Real: 84.438 CPU: 366.390 GC: 10.992 GCCount: 29 avg throughput: 1254820.856 op/s p95 throughput: 1481654.517 op/s p99 throughput: 1508043.762 op/s
Operator: range, Real: 114.877 CPU: 338.090 GC: 8.465 GCCount: 21 avg throughput: 935155.065 op/s p95 throughput: 1254738.341 op/s p99 throughput: 1280340.688 op/s
Original
Operator: putAll, Real: 95.037 CPU: 378.850 GC: 11.078 GCCount: 29 avg throughput: 1100406.576 op/s p95 throughput: 1292393.480 op/s p99 throughput: 1297963.396 op/s
Operator: range, Real: 111.177 CPU: 328.180 GC: 8.299 GCCount: 21 avg throughput: 967757.168 op/s p95 throughput: 1281079.326 op/s p99 throughput: 1296172.722 op/s
Operator: putAll, Real: 95.186 CPU: 356.040 GC: 10.132 GCCount: 28 avg throughput: 1092794.645 op/s p95 throughput: 1257861.812 op/s p99 throughput: 1286016.834 op/s
Operator: range, Real: 112.568 CPU: 347.350 GC: 9.163 GCCount: 25 avg throughput: 952179.810 op/s p95 throughput: 1298717.792 op/s p99 throughput: 1323234.359 op/s
Operator: putAll, Real: 97.332 CPU: 400.690 GC: 12.000 GCCount: 30 avg throughput: 1079682.386 op/s p95 throughput: 1283925.766 op/s p99 throughput: 1290499.661 op/s
Operator: range, Real: 109.653 CPU: 319.020 GC: 7.995 GCCount: 20 avg throughput: 980557.905 op/s p95 throughput: 1298695.144 op/s p99 throughput: 1313645.941 op/s
```
One thing that I am noticing is that range is better with the original implementation. This is contrary to the initial number s I had posted [here](https://github.com/apache/kafka/pull/10798#issuecomment-872398600) which was wrong due to a bug in my implementation back then.
Also, the discussion we had about the capacity allocation rocksdb places a limit of 8MB on keys and 3 GB for values. Either ways, it doesn't recommend storing large keys/values. Wanted to know if there are any configs for keys/values sizes for stores? Don't think I could find it.
Lastlty, with the pre-allocation approach for Interactive queries which allows multi threaded access, do we want to restirct by N as you specified above?
What do you. and (@guozhangwang / @cadonna ) think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703439679
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Re (1) if all use cases are single threaded then yes we can allocate some buffer(s) as part of the store. Otherwise, if you need to support multiple concurrent ops then you could pre-populate a queue with a N buffers, and N becomes the maximum number of concurrent requests you can server. If you're queue is empty then a request would have to wait until at least one of the outstanding requests completes and add its buffer to the queue. Again that might all not be needed given the API is single-threaded.
Re (2), is there a max-size, maybe given by the maximum Kafka message size that is configured (if such a limit exists and is not too big)?
If we don't want to change the API (I guess it would be the RocksDBStore interface that would be changed which is not exposed I think, but still) then splitting this work into part I where we copy heap to direct buffers, and then a part II where we directly serialize into direct buffers is a way to go.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-864174533
@cadonna , @guozhangwang I ran some jmh benchmarks on this. I ran the 2 tests with G1GC and prof gc=> 1 for putAll and the other one for range query. Here are the results for putAll:
```
Original AK codebase
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 60.489 ? 2.154 ops/s
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate thrpt 15 158.731 ? 5.651 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm thrpt 15 2889264.628 ? 26.474 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space thrpt 15 160.811 ? 9.073 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space.norm thrpt 15 2927507.100 ? 141145.743 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space thrpt 15 0.251 ? 0.145 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space.norm thrpt 15 4536.878 ? 2486.338 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count thrpt 15 161.000 counts
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time
```
```
DirectByteBuffer
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 97.908 ? 1.820 ops/s
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate thrpt 15 256.946 ? 4.777 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm thrpt 15 2889553.977 ? 4.331 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space thrpt 15 256.202 ? 51.550 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2879662.678 ? 563793.832 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.030 ? 0.039 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 336.245 ? 437.382 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.025 ? 0.105 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 285.618 ? 1182.589 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count thrpt 15 33.000 counts
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time thrpt 15 308.000 ms
```
And here are the results for range:
```
Original AK code
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 117.454 ? 2.620 ops/s
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate thrpt 15 290.118 ? 6.471 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm thrpt 15 2719700.009 ? 32.687 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space thrpt 15 295.066 ? 64.171 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2768668.326 ? 613598.308 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.002 ? 0.005 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 16.068 ? 47.296 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.051 ? 0.210 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 466.034 ? 1929.591 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count thrpt 15 38.000 counts
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time thrpt 15 431.000 ms
```
```
DirectBytBuffer
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 114.871 ? 4.080 ops/s
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate thrpt 15 284.127 ? 10.090 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm thrpt 15 2723428.297 ? 33.503 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space thrpt 15 286.413 ? 64.054 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2743440.651 ? 593666.103 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.007 ? 0.024 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 67.053 ? 224.086 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.025 ? 0.105 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 239.812 ? 992.929 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count thrpt 15 37.000 counts
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time thrpt 15 515.000 ms
```
Throughput wise, there was a jump of 30+ ops/s for the putAll case. range performed slightly worse in the original codebase. I can change the benchmark mode to AverageTime for range query.
One thing I am noticing is that the DirectByteBuffer tests are continuously clocking higher gc allocation /gc count and gc time. You can find the benchmark related code here: https://github.com/apache/kafka/pull/10842/files.
One of my guesses is that for this to actually show some gains, the application shouldn't be making use of byte[] and allocate directly. That doesn't seem possible with the way the State stores are exposed via the DSL because finally, everything is serialised into a byte[]. What do you guys think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-919419275
> hey @guozhangwang , that makes perfect sense to me. Based upon the discussion me and @patrickstuedi had on this thread, I think to reap the real benefits, the optimisations listed here and on the JIRA makes sense.
> We can hold off on this PR till then. Thanks!
Great, thanks @vamossagar12 !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644672950
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
ok. That makes sense. High concurrency is one of the cases where this might be useful. Having said that, on the PR, there are benchmarking numbers for a large number of put operations in a single threaded manner. As per the numbers direct byte buffer was 37% faster and with 0 GC cycles.
Here is the comment: https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
The users of kafka streams might call put() in this manner where in they loop through a bunch of records and use put() to insert. From the state store side, either we create 1 DirectByteBuffer object for put() and keep reusing it- subject to testing. But that might not always be the case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-1035710408
@cadonna , done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644633213
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
The only case I can think of that might have high concurrency on RocksDB state store is with interactive queries. Without interactive queries there is no concurrency on the state stores since only the stream thread that has assigned the stateful task owning the state store accesses the state store.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644356441
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
I'm not very familiar with the direct buffer usage pattern, but currently it seems we would still try to allocate a new buffer for each put call, whereas I "thought" the main benefits come from reusing the buffer across multiple put calls? @vamossagar12 @ableegoldman @cadonna please correct me if I'm wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-907668633
Thanks @guozhangwang , I have made changes to the PR. I think it could be reviewed now and hence moved the PR from draft to Ready to Review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 edited a comment on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-864174533
@cadonna , @guozhangwang I ran some jmh benchmarks on this. I ran the 2 tests with G1GC and prof gc=> 1 for putAll and the other one for range query. Here are the results for putAll:
```
Original AK codebase
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 60.489 ? 2.154 ops/s
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate thrpt 15 158.731 ? 5.651 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm thrpt 15 2889264.628 ? 26.474 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space thrpt 15 160.811 ? 9.073 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space.norm thrpt 15 2927507.100 ? 141145.743 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space thrpt 15 0.251 ? 0.145 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space.norm thrpt 15 4536.878 ? 2486.338 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count thrpt 15 161.000 counts
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time
```
```
DirectByteBuffer
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance thrpt 15 97.908 ? 1.820 ops/s
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate thrpt 15 256.946 ? 4.777 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm thrpt 15 2889553.977 ? 4.331 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space thrpt 15 256.202 ? 51.550 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2879662.678 ? 563793.832 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.030 ? 0.039 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 336.245 ? 437.382 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.025 ? 0.105 MB/sec
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 285.618 ? 1182.589 B/op
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count thrpt 15 33.000 counts
StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time thrpt 15 308.000 ms
```
And here are the results for range:
```
Original AK code
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 117.454 ? 2.620 ops/s
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate thrpt 15 290.118 ? 6.471 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm thrpt 15 2719700.009 ? 32.687 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space thrpt 15 295.066 ? 64.171 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2768668.326 ? 613598.308 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.002 ? 0.005 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 16.068 ? 47.296 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.051 ? 0.210 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 466.034 ? 1929.591 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count thrpt 15 38.000 counts
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time thrpt 15 431.000 ms
```
```
DirectBytBuffer
Benchmark Mode Cnt Score Error Units
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance thrpt 15 114.871 ? 4.080 ops/s
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate thrpt 15 284.127 ? 10.090 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm thrpt 15 2723428.297 ? 33.503 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space thrpt 15 286.413 ? 64.054 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm thrpt 15 2743440.651 ? 593666.103 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen thrpt 15 0.007 ? 0.024 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm thrpt 15 67.053 ? 224.086 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space thrpt 15 0.025 ? 0.105 MB/sec
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm thrpt 15 239.812 ? 992.929 B/op
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count thrpt 15 37.000 counts
StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time thrpt 15 515.000 ms
```
Throughput wise, there was a jump of 30+ ops/s for the putAll case. range performed slightly worse in the original codebase. I can change the benchmark mode to AverageTime for range query.
One thing I am noticing is that the DirectByteBuffer tests are continuously clocking higher gc allocation /gc count and gc time. You can find the benchmark related code here: https://github.com/apache/kafka/pull/10842/files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294
You were referring to this commit https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb as for the tweaks right?
BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-917365748
hey @guozhangwang , that makes perfect sense to me. Based upon the discussion me and @patrickstuedi had on this thread, I think to reap the real benefits, the optimisations listed here and on the JIRA makes sense.
We can hold off on this PR till then. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703309462
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
@vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.). You can keep queues of free and used buffers and move them around during state store operations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-914624644
Okay, let me re-trigger the tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-898433681
@cadonna , @guozhangwang plz review the numbers whenever you get the chance...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644566174
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
The [javadocs](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html) say
> The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers.
So it seems allocating a direct buffer during each put might decrease performance. The internal benchmarks we run on this PR did not show any increase in throughput, but rather a decrease. However, I do not think the decrease was significant, so the cause could also just be a bad day of the environment. Anyways, continuously allocating a direct buffer does not seem to be a good idea, as the javadocs also say:
> It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
Another thing to consider:
> The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-872399579
> You were referring to this commit [68a947c](https://github.com/apache/kafka/commit/68a947c0eb6a5cc4bdde24083c83f4638e708edb) as for the tweaks right?
>
> BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?
Yeah range performed better on jmh. However, for one of the runs putAll clocked around 1.095 and above. Every run it tends to give slighly different number but it's on the higher side than non byte buffer..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644699017
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
}
}
+ private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
Review comment:
Yes, I think you should try to benchmarks putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org