You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/31 14:43:25 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

vamossagar12 opened a new pull request #10798:
URL: https://github.com/apache/kafka/pull/10798


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703333661



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       > @vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.). You can keep queues of free and used buffers and move them around during state store operations.
   
   Thanks @patrickstuedi , that's a great suggestion. I think it makes sense given the expense in allocation/disposal. Couple of questions there:
   
   1)  I think the factor to consider there would be how to manage the pool of used/free buffers? As already pointed by @cadonna  above, the only case where we can expect concurrent access to state stores would be for IQ and otherwise it's single threaded. For the single threaded case, it's going to be straight forward but for IQ, what happens if a request comes in and none of the direct byte buffers are `free` ? Do we re-allocate on the fly?
   2) This might be a small issue but calling it out. We need to allocate some capacity to the direct byte buffer. Currently, I am iterating over the list of keys and values and assigning the max length- which again mayn't be the most efficient way. If I were to create the pool, then I would need to know the capacity to allocate upfront. Can this be exposed as a config for the users in that case? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 edited a comment on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035


   @guozhangwang / @cadonna  I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries around .3 ops/s and .15 ops/s respectively:
   
   Here is the compaision:
   
   ```
   testPersistentRangeQueryPerformance original
   
   Benchmark                                                             Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   15  1.131 ? 0.028  ops/s
   
   testPersistentPutAllPerformance original
   
   Benchmark                                                         Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  0.919 ? 0.037  ops/s
   
   
   testPersistentRangeQueryPerformance bytebuffer
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   15  1.442 ? 0.038  ops/s
   
   
   Benchmark                                                         Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  1.065 ? 0.041  ops/s
   ```
   
   I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch.
   
   Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644552319



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I had the same thought as @guozhangwang, but I am also not familiar with direct buffers.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       The [javadocs](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html) say
   
   > The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers.
   
   So it seems allocating a direct buffer during each put might decrease performance. The internal benchmarks we run on this PR did not show any increase in throughput, but rather a decrease. However, I do not think the decrease was significant, so the cause could also just be a bad day of the environment. Anyways, continuously allocating a direct buffer does not seem to be a good idea, as the javadocs also say:
   
   > It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance. 
      
   Another thing to consider:
   
   > The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       The only case I can think of that might have high concurrency on RocksDB state store is with interactive queries. Without interactive queries there is no concurrency on the state stores since only the stream thread that has assigned the stateful task owning the state store accesses the state store.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I think you should try to benchmarks putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I think you should try to experiment with putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes. I am curious if we will also get such improvements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-917118246


   Hi @vamossagar12 Thanks again for your patience! I know it has been dragging a bit long, and I had some discussions with @cadonna and @vamossagar12 regarding how we can move forward to eventually merge-in your work to Apache Kafka. As a result of that I created this ticket trying to summarize what we want to do (which would incorporate this JIRA ticket):
   
   https://issues.apache.org/jira/browse/KAFKA-13286
   
   At the moment, our main concerns are that:
   
   1) Direct bytebuffers, which is relied on native memory, is not fully managed by the JVM (GC) and hence we need to be very careful about its side-effects regarding memory pressures.
   2) The current implementation without the API changes on serdes has a relatively small improvement on throughput, whereas in order to use the byte-buffer without API changes, we also would need to "guess" the allocated size before serialization.
   
   So we think it's probably better to consider tackling the larger scope of https://issues.apache.org/jira/browse/KAFKA-13286 as a whole, which would mean we would hold on merging this PR after we have the serde / byte-buffer supplier works in place. Does that sound reasonable to you? Would love to hear your thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035


   @guozhangwang / @cadonna  I made some tweaks to the code and also started testing with 1M keys. Now I see differences in terms of throughput for both range and putAll queries:
   
   Here is the compaision:
   
   ```
   testPersistentRangeQueryPerformance original
   
   Benchmark                                                             Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   15  1.131 ? 0.028  ops/s
   
   testPersistentPutAllPerformance original
   
   Benchmark                                                         Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  0.919 ? 0.037  ops/s
   
   
   testPersistentRangeQueryPerformance bytebuffer
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   15  1.442 ? 0.038  ops/s
   
   
   Benchmark                                                         Mode  Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  1.065 ? 0.041  ops/s
   ```
   
   I needed to add ByteOrder after creating DirectByteBuffer object. And for putAll, i needed to flip before calling `put` to batch.
   
   Next step for me would be to create a kafka streams app and test throughput. I will post them here once i have those as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703312659



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-876082875


   @guozhangwang , @cadonna  did you get a chance to look at the numbers? Any tweaks/other tests you would suggest ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703469853



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       I see. I think barring Interactive Queries, the other accesses are single threaded. 
   Regarding the max-size, TBH i am not aware. 
   Also, regarding changing the API, yeah rocksdb store isn't exposed so the new apis will have to bein KVStore with default implementation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-903313112


   Sorry @vamossagar12 for the late reply.. I took a look at the current PR and it looks good to me. The throughput numbers seems promising as well. Could you clean up the PR a bit while I will ping @cadonna for another look at the PR as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644699017



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I think you should try to experiment with putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes. I am curious if we will also get such improvements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-887703246


   hey @guozhangwang / @cadonna .. sorry for being nosey here but did you get a chance to look at these numbers? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       > Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...
   
   Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP.
   
   Infact, we could even go one step ahead and add support for only DirectByteBuffers if that makes sense. We can throw errors if the ByteBuffer isn't direct for example- similar to the rocksdb implementation.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644356441



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       I'm not very familiar with the direct buffer usage pattern, but currently it seems we would still try to allocate a new buffer for each put call, whereas I "thought" the main benefits come from reusing the buffer across multiple put calls? @vamossagar12 @ableegoldman @cadonna please correct me if I'm wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-910510701


   Tests failed on compilation due to checkstyle warnings:
   
   ```
   [2021-08-28T18:29:04.088Z] > Task :streams:streams-scala:compileTestScala
   
   [2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala:19: imported `Named` is permanently hidden by definition of type Named in package kstream
   
   [2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:24: imported `Named` is permanently hidden by definition of type Named in package kstream
   
   [2021-08-28T18:29:04.088Z] [Warn] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10798/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala:21: imported `Named` is permanently hidden by definition of type Named in package kstream
   
   [2021-08-28T18:29:05.982Z] 
   
   [2021-08-28T18:29:05.982Z] > Task :streams:checkstyleTest
   
   [2021-08-28T18:29:11.170Z] 
   
   [2021-08-28T18:29:11.170Z] > Task :streams:streams-scala:compileTestScala
   
   [2021-08-28T18:29:11.170Z] three warnings found
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-911592247


   @guozhangwang , these errors are not due to the changes in this PR:
   
   `imported `Named` is permanently hidden by definition of type Named in package kstream` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       > Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore<Bytes, ByteBuffer>, or anything similar...
   
   Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644616944



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Thank you @guozhangwang , @cadonna . I think creating it every time does not make much sense. I should have been more careful before adding and asking for the internal benchmarks. In that case, would it even make sense to have it in an API like put and instead use it for putAll()/range/reverseRange/prefixSeek operations? 
   
   That's because in the case of put, it is difficult to know how many put operations may be requested. If users were using the rocksdb library directly, then they can create DirectByteBuffers once and push as many entries as they want.
   
   Based upon my conversations with the rocksdb one of the comments was this:
   
   `Extracting large amounts of data under high concurrency, non-direct byte buffer will bring serious GC problems to the upper level Java services.`
   
   i guess, we can target those APIs? WDYT?
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-1027802506


   @vamossagar12 Can we close this PR and the corresponding ticket since it seems we decided to look for a more complete solution? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915194995


   > > Okay, let me re-trigger the tests.
   > 
   > Thanks.. This time there's a compilation error due to benchmarks. I will remove that class.
   
   @guozhangwang , i removed that. class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294


   You were referring to this commit https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb as for the tweaks right?
   
   BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644616944



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Thank you @guozhangwang , @cadonna . I think creating it every time does not make much sense. I should have been more careful before adding and asking for the internal benchmarks. In that case, would it even make sense to have it in an API like put and instead use it for putAll()/range/reverseRange/prefixSeek operations? 
   
   That's because in the case of put, it is difficult to know how many put operations may be requested. If users were using the rocksdb library directly, then they can create DirectByteBuffers once and push as many entries as they want.
   
   Based upon my conversations with the rocksdb one of the comments was this:
   
   `Extracting large amounts of data under high concurrency, non-direct byte buffer will bring serious GC problems to the upper level Java services.`
   
   i guess, we can target those APIs? WDYT?
   
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       ok. That makes sense. High concurrency is one of the cases where this might be useful. Having said that, on the PR, there are benchmarking numbers for a large number of put operations in a single threaded manner. As per the numbers direct byte buffer was 37% faster and with 0 GC cycles. 
   
   Here is the comment: https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
   
   The users of kafka streams might call put() in this manner where in they loop through a bunch of records and use put() to insert. From the state store side, either we create 1 DirectByteBuffer object for put() and keep reusing it- subject to testing. But that might not always be the case. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-872398600


   @guozhangwang , thats the commit i was referring to. Setting a byte order and seems to have an impact on performance. Also, earlier i was calling put first and then calling flip which is wrong. So, I tweaked that.
   
   I have another update on this piece. I created a small kafka streams app which subscribes to a single partition topic and for every record consumed, it has 2 processors: 1) putAll processor which ingests 1M records into state store and then 2). range processor which does a range query for all keys inserted.
   
   I pushed 100 records to the topic and these 2 processors got invoked each time. Here are the numbers from that run:
   
   ```
   DirectByteBuffer ->
   
   Operator: putAll, Real: 80.722 CPU: 273.690 GC: 6.974 GCCount: 36 avg throughput: 1269130.717 op/s p95 throughput: 1430952.065 op/s p99 throughput: 1453862.308 op/s 
   Operator: range, Real: 114.511 CPU: 271.410 GC: 5.501 GCCount: 26 avg throughput: 920462.473 op/s p95 throughput: 1234215.537 op/s p99 throughput: 1588364.216 op/s 
   ```
   
   ```
   Original ->
   
   Operator: putAll, Real: 92.988 CPU: 288.550 GC: 6.900 GCCount: 53 avg throughput: 1088404.000 op/s p95 throughput: 1233248.141 op/s p99 throughput: 1252514.840 op/s 
   Operator: range, Real: 110.418 CPU: 268.070 GC: 6.105 GCCount: 40 avg throughput: 957558.135 op/s p95 throughput: 1277487.581 op/s p99 throughput: 1388059.720 op/s 
   ```
   
   The way I calculate throughput is for every iteration, I divided 1M by time taken for that iteration. You can find the relevant code here:
   
   https://github.com/vamossagar12/kafka-streams-throughputbenchmarking/tree/master/src/main/java/com/bytebyffer/benchmarks
   
   Let me know if you and @cadonna think we need other. kinds of benchmarks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915026729


   > Okay, let me re-trigger the tests.
   
   Thanks.. This time there's a compilation error due to benchmarks. I will remove that class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r705268638



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -32,34 +34,49 @@
     private final byte[] rawLastKey;
     private final boolean forward;
     private final boolean toInclusive;
+    private final ByteBuffer directByteBuffer;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes from,
                          final Bytes to,
                          final boolean forward,
-                         final boolean toInclusive) {
+                         final boolean toInclusive,
+                         final ByteBuffer directByteBuffer) {
         super(storeName, iter, openIterators, forward);
         this.forward = forward;
         this.toInclusive = toInclusive;
+        this.directByteBuffer = directByteBuffer;
         if (forward) {
             if (from == null) {
                 iter.seekToFirst();
             } else {
-                iter.seek(from.get());
+                allocateDirectByteBufferAndSeek(iter, from, true);
             }
             rawLastKey = to == null ? null : to.get();
         } else {
             if (to == null) {
                 iter.seekToLast();
             } else {
-                iter.seekForPrev(to.get());
+                allocateDirectByteBufferAndSeek(iter, to, false);
             }
             rawLastKey = from == null ? null : from.get();
         }
     }
 
+    private void allocateDirectByteBufferAndSeek(final RocksIterator iter, final Bytes bytes, final boolean forward) {
+        /*final ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(bytes.get().length);
+        directByteBuffer.order(ByteOrder.nativeOrder());*/
+        directByteBuffer.clear();
+        directByteBuffer.put(bytes.get());
+        directByteBuffer.flip();

Review comment:
       You might want to pull this up or into a separate method to avoid checking the direction twice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644552319



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I had the same thought as @guozhangwang, but I am also not familiar with direct buffers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 closed pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 closed pull request #10798:
URL: https://github.com/apache/kafka/pull/10798


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r704376366



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       @patrickstuedi , I resorted to creating only 1 instance of DirectByteBuffer(one for putAll keys and values and one for range. I have checked in that code for reference. However, I don't see much of a difference in terms of throughput numbers. I ran it on the same setup(pushing 100 messages on a single partition topic and for each message, push 1M keys using putAll and read through range() call).
   
   I did 3 runs per approach (original v/s ByteBuffer) Here are the numbers:
   
   ```
   ByteBuffer
   Operator: putAll, Real: 87.206 CPU: 371.460 GC: 11.048 GCCount: 30 avg throughput: 1206503.194 op/s p95 throughput: 1423370.427 op/s p99 throughput: 1437098.797 op/s 
   Operator: range, Real: 115.121 CPU: 328.090 GC: 7.925 GCCount: 20 avg throughput: 923332.052 op/s p95 throughput: 1261332.193 op/s p99 throughput: 1308671.917 op/s 
   
   Operator: putAll, Real: 87.274 CPU: 382.920 GC: 11.477 GCCount: 29 avg throughput: 1211901.123 op/s p95 throughput: 1432304.571 op/s p99 throughput: 1447671.672 op/s 
   Operator: range, Real: 116.886 CPU: 335.610 GC: 8.230 GCCount: 21 avg throughput: 911159.182 op/s p95 throughput: 1234183.861 op/s p99 throughput: 1255574.938 op/s
   
   putAll, Real: 84.438 CPU: 366.390 GC: 10.992 GCCount: 29 avg throughput: 1254820.856 op/s p95 throughput: 1481654.517 op/s p99 throughput: 1508043.762 op/s 
   Operator: range, Real: 114.877 CPU: 338.090 GC: 8.465 GCCount: 21 avg throughput: 935155.065 op/s p95 throughput: 1254738.341 op/s p99 throughput: 1280340.688 op/s
   
   
   Original
   Operator: putAll, Real: 95.037 CPU: 378.850 GC: 11.078 GCCount: 29 avg throughput: 1100406.576 op/s p95 throughput: 1292393.480 op/s p99 throughput: 1297963.396 op/s 
   Operator: range, Real: 111.177 CPU: 328.180 GC: 8.299 GCCount: 21 avg throughput: 967757.168 op/s p95 throughput: 1281079.326 op/s p99 throughput: 1296172.722 op/s
   
   Operator: putAll, Real: 95.186 CPU: 356.040 GC: 10.132 GCCount: 28 avg throughput: 1092794.645 op/s p95 throughput: 1257861.812 op/s p99 throughput: 1286016.834 op/s 
   Operator: range, Real: 112.568 CPU: 347.350 GC: 9.163 GCCount: 25 avg throughput: 952179.810 op/s p95 throughput: 1298717.792 op/s p99 throughput: 1323234.359 op/s 
   
   Operator: putAll, Real: 97.332 CPU: 400.690 GC: 12.000 GCCount: 30 avg throughput: 1079682.386 op/s p95 throughput: 1283925.766 op/s p99 throughput: 1290499.661 op/s 
   Operator: range, Real: 109.653 CPU: 319.020 GC: 7.995 GCCount: 20 avg throughput: 980557.905 op/s p95 throughput: 1298695.144 op/s p99 throughput: 1313645.941 op/s
   ```
   
   One thing that I am noticing is that range is better with the original implementation. This is contrary to the initial number s I had posted [here](https://github.com/apache/kafka/pull/10798#issuecomment-872398600) which was wrong due to a bug in my implementation back then.
   
   Also, the discussion we had about the capacity allocation rocksdb places a limit of 8MB on keys and 3 GB for values. Either ways, it doesn't recommend storing large keys/values. Wanted to know if there are any configs for keys/values sizes for stores? Don't think I could find it.
   
   Lastlty, with the pre-allocation approach for Interactive queries which allows multi threaded access, do we want to restirct by N as you specified above?
   
   What do you. and (@guozhangwang / @cadonna ) think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703439679



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Re (1) if all use cases are single threaded then yes we can allocate some buffer(s) as part of the store. Otherwise, if you need to support multiple concurrent ops then you could pre-populate a queue with a N buffers, and N becomes the maximum number of concurrent requests you can server. If you're queue is empty then a request would have to wait until at least one of the outstanding requests completes and add its buffer to the queue. Again that might all not be needed given the API is single-threaded. 
    
   Re (2), is there a max-size, maybe given by the maximum Kafka message size that is configured (if such a limit exists and is not too big)? 
   
   If we don't want to change the API (I guess it would be the RocksDBStore interface that would be changed which is not exposed I think, but still) then splitting this work into part I where we copy heap to direct buffers, and then a part II where we directly serialize into direct buffers is a way to go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-864174533


   @cadonna , @guozhangwang  I ran some jmh benchmarks on this. I ran the 2 tests with G1GC and prof gc=> 1 for putAll and the other one for range query. Here are the results for putAll:
   
   ```
   Original AK codebase
   
   Benchmark                                                                                          Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance                                   thrpt   15       60.489 ?      2.154   ops/s
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate                    thrpt   15      158.731 ?      5.651  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm               thrpt   15  2889264.628 ?     26.474    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space           thrpt   15      160.811 ?      9.073  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space.norm      thrpt   15  2927507.100 ? 141145.743    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space       thrpt   15        0.251 ?      0.145  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space.norm  thrpt   15     4536.878 ?   2486.338    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count                         thrpt   15      161.000               counts
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time  
   ```
   
   ```
   DirectByteBuffer
   
   Benchmark                                                                                          Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance                                   thrpt   15       97.908 ?      1.820   ops/s
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate                    thrpt   15      256.946 ?      4.777  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm               thrpt   15  2889553.977 ?      4.331    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space           thrpt   15      256.202 ?     51.550  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2879662.678 ? 563793.832    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.030 ?      0.039  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15      336.245 ?    437.382    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.025 ?      0.105  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      285.618 ?   1182.589    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count                         thrpt   15       33.000               counts
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time                          thrpt   15      308.000                   ms
   ```
   
   And here are the results for range:
   
   ```
   Original AK code
   Benchmark                                                                                              Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance                                   thrpt   15      117.454 ?      2.620   ops/s
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate                    thrpt   15      290.118 ?      6.471  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm               thrpt   15  2719700.009 ?     32.687    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space           thrpt   15      295.066 ?     64.171  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2768668.326 ? 613598.308    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.002 ?      0.005  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15       16.068 ?     47.296    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.051 ?      0.210  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      466.034 ?   1929.591    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count                         thrpt   15       38.000               counts
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time                          thrpt   15      431.000                   ms
   ```
   
   ```
   DirectBytBuffer
   
   Benchmark                                                                                              Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance                                   thrpt   15      114.871 ?      4.080   ops/s
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate                    thrpt   15      284.127 ?     10.090  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm               thrpt   15  2723428.297 ?     33.503    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space           thrpt   15      286.413 ?     64.054  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2743440.651 ? 593666.103    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.007 ?      0.024  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15       67.053 ?    224.086    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.025 ?      0.105  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      239.812 ?    992.929    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count                         thrpt   15       37.000               counts
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time                          thrpt   15      515.000                   ms
   ```
   
   Throughput wise, there was a jump of 30+ ops/s for the putAll case. range performed slightly worse in the original codebase. I can change the benchmark mode to AverageTime for range query. 
   
   One thing I am noticing is that the DirectByteBuffer tests are continuously clocking higher gc allocation /gc count and gc time. You can find the benchmark related code here: https://github.com/apache/kafka/pull/10842/files.
   
   One of my guesses is that for this to actually show some gains, the application shouldn't be making use of byte[] and allocate directly. That doesn't seem possible with the way the State stores are exposed via the DSL because finally, everything is serialised into a byte[]. What do you guys think?
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-919419275


   > hey @guozhangwang , that makes perfect sense to me. Based upon the discussion me and @patrickstuedi had on this thread, I think to reap the real benefits, the optimisations listed here and on the JIRA makes sense.
   > We can hold off on this PR till then. Thanks!
   
   Great, thanks @vamossagar12 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644672950



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       ok. That makes sense. High concurrency is one of the cases where this might be useful. Having said that, on the PR, there are benchmarking numbers for a large number of put operations in a single threaded manner. As per the numbers direct byte buffer was 37% faster and with 0 GC cycles. 
   
   Here is the comment: https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
   
   The users of kafka streams might call put() in this manner where in they loop through a bunch of records and use put() to insert. From the state store side, either we create 1 DirectByteBuffer object for put() and keep reusing it- subject to testing. But that might not always be the case. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-1035710408


   @cadonna , done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644633213



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       The only case I can think of that might have high concurrency on RocksDB state store is with interactive queries. Without interactive queries there is no concurrency on the state stores since only the stream thread that has assigned the stateful task owning the state store accesses the state store.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644356441



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       I'm not very familiar with the direct buffer usage pattern, but currently it seems we would still try to allocate a new buffer for each put call, whereas I "thought" the main benefits come from reusing the buffer across multiple put calls? @vamossagar12 @ableegoldman @cadonna please correct me if I'm wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-907668633


   Thanks @guozhangwang , I have made changes to the PR. I think it could be reviewed now and hence moved the PR from draft to Ready to Review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 edited a comment on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-864174533


   @cadonna , @guozhangwang  I ran some jmh benchmarks on this. I ran the 2 tests with G1GC and prof gc=> 1 for putAll and the other one for range query. Here are the results for putAll:
   
   ```
   Original AK codebase
   
   Benchmark                                                                                          Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance                                   thrpt   15       60.489 ?      2.154   ops/s
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate                    thrpt   15      158.731 ?      5.651  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm               thrpt   15  2889264.628 ?     26.474    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space           thrpt   15      160.811 ?      9.073  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Eden_Space.norm      thrpt   15  2927507.100 ? 141145.743    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space       thrpt   15        0.251 ?      0.145  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.PS_Survivor_Space.norm  thrpt   15     4536.878 ?   2486.338    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count                         thrpt   15      161.000               counts
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time  
   ```
   
   ```
   DirectByteBuffer
   
   Benchmark                                                                                          Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance                                   thrpt   15       97.908 ?      1.820   ops/s
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate                    thrpt   15      256.946 ?      4.777  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.alloc.rate.norm               thrpt   15  2889553.977 ?      4.331    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space           thrpt   15      256.202 ?     51.550  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2879662.678 ? 563793.832    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.030 ?      0.039  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15      336.245 ?    437.382    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.025 ?      0.105  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      285.618 ?   1182.589    B/op
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.count                         thrpt   15       33.000               counts
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance:?gc.time                          thrpt   15      308.000                   ms
   ```
   
   And here are the results for range:
   
   ```
   Original AK code
   Benchmark                                                                                              Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance                                   thrpt   15      117.454 ?      2.620   ops/s
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate                    thrpt   15      290.118 ?      6.471  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm               thrpt   15  2719700.009 ?     32.687    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space           thrpt   15      295.066 ?     64.171  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2768668.326 ? 613598.308    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.002 ?      0.005  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15       16.068 ?     47.296    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.051 ?      0.210  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      466.034 ?   1929.591    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count                         thrpt   15       38.000               counts
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time                          thrpt   15      431.000                   ms
   ```
   
   ```
   DirectBytBuffer
   
   Benchmark                                                                                              Mode  Cnt        Score        Error   Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance                                   thrpt   15      114.871 ?      4.080   ops/s
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate                    thrpt   15      284.127 ?     10.090  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.alloc.rate.norm               thrpt   15  2723428.297 ?     33.503    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space           thrpt   15      286.413 ?     64.054  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Eden_Space.norm      thrpt   15  2743440.651 ? 593666.103    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen              thrpt   15        0.007 ?      0.024  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Old_Gen.norm         thrpt   15       67.053 ?    224.086    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space       thrpt   15        0.025 ?      0.105  MB/sec
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.churn.G1_Survivor_Space.norm  thrpt   15      239.812 ?    992.929    B/op
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.count                         thrpt   15       37.000               counts
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance:?gc.time                          thrpt   15      515.000                   ms
   ```
   
   Throughput wise, there was a jump of 30+ ops/s for the putAll case. range performed slightly worse in the original codebase. I can change the benchmark mode to AverageTime for range query. 
   
   One thing I am noticing is that the DirectByteBuffer tests are continuously clocking higher gc allocation /gc count and gc time. You can find the benchmark related code here: https://github.com/apache/kafka/pull/10842/files.
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-870174294


   You were referring to this commit https://github.com/apache/kafka/pull/10798/commits/68a947c0eb6a5cc4bdde24083c83f4638e708edb as for the tweaks right?
   
   BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-917365748


   hey @guozhangwang , that makes perfect sense to me. Based upon the discussion me and @patrickstuedi  had on this thread, I think to reap the real benefits, the optimisations listed here and on the JIRA makes sense. 
   We can hold off on this PR till then. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703309462



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       @vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.).  You can keep queues of free and used buffers and move them around during state store operations. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-914624644


   Okay, let me re-trigger the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-898433681


   @cadonna , @guozhangwang  plz review the numbers whenever you get the chance...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644566174



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       The [javadocs](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html) say
   
   > The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers.
   
   So it seems allocating a direct buffer during each put might decrease performance. The internal benchmarks we run on this PR did not show any increase in throughput, but rather a decrease. However, I do not think the decrease was significant, so the cause could also just be a bad day of the environment. Anyways, continuously allocating a direct buffer does not seem to be a good idea, as the javadocs also say:
   
   > It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance. 
      
   Another thing to consider:
   
   > The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-872399579


   > You were referring to this commit [68a947c](https://github.com/apache/kafka/commit/68a947c0eb6a5cc4bdde24083c83f4638e708edb) as for the tweaks right?
   > 
   > BTW it's bit interesting to see the improvement for range is around 1.442 / 1.131 = 1.27 while for putAll it is 1.065 / 0.919 = 1.15. Given the key length to be similar in the benchmarks I was expecting latter has a bigger benefit. @cadonna WDYT?
   
   Yeah range performed better on jmh. However, for one of the runs putAll clocked around 1.095 and above. Every run it tends to give slighly different number but it's on the higher side than non byte buffer..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r644699017



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
         }
     }
 
+    private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+        ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
       Yes, I think you should try to benchmarks putAll()/range/reverseRange/prefixSeek operations as you proposed with a simple Kafka Streams app. That would be great to better understand the potential of direct buffers for Kafka Streams. Maybe experiment also with different key and value sizes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org