You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/22 13:09:01 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

divijvaidya opened a new pull request, #12890:
URL: https://github.com/apache/kafka/pull/12890

   ## Motivation 
   We create an instance of ObjectSerializationCache  at https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L113 which does not get used at all. We always "add" to the cache but never retrieve from it (as is evident by the fact that we don't store the reference of the cache anywhere).
   
   Adding information to the cache is expensive because it uses `System.identityHashCode(Object)` which is expensive as demonstrated by the flame graph of producer requests over Apache Kafka 3.3.1 plaintext broker.
   
   ![Screenshot 2022-11-21 at 19 23 53](https://user-images.githubusercontent.com/71267/203320985-c08e4447-cc8e-40d2-88f5-69eb454ef63b.png)
   
   ## Change
   Currently, the header of a request is parsed by the processor thread prior to adding it to RequestChannel. With this change, we cache the computed size in the `RequestHeader` object itself at the time of parsing the bytebuffer into a `RequestHeader` object. The cached size is re-used when it is required at `RequestChannel`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1353397300

   @ijuma yes, there was a warm up workload prior to profiling. The JVM was probably alive for ~7-8 min. before the profile capture started. What are the fishy things that you notice here? I can try again on a long running server if you like.
   
   I am going to be using a similar profiler as a motivation for some future changes that I have lined up (I am currently writing a JMH benchmark for my other ArrayBuffer vs. ListBuffer PR) and I want to ensure we are on the same page wrt it's effectiveness. Hence, let's resolve this. What can I change in my setup that can help us understand the flamegraph better? I would be happy to jump on a call too to explain my setup if that makes things faster or we can use the public slack channel (ASF workspace, #kafka channel) to communicate faster on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1328810024

   Test failures are unrelated.
   ```
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest.testOutdatedCoordinatorAssignment()
   
   Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft
   
   Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft
   
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology
   
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
    ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
mimaison merged PR #12890:
URL: https://github.com/apache/kafka/pull/12890


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032514676


##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -75,7 +77,17 @@ public void write(ByteBuffer buffer, ObjectSerializationCache serializationCache
     }
 
     public int size(ObjectSerializationCache serializationCache) {
-        return data.size(serializationCache, headerVersion);
+        if (this.size == SIZE_NOT_INITIALIZED) {

Review Comment:
   1. I changed the access modifier for this method to default. It is used by tests hence, I am currently not changing to private.
   
   2. I have made similar changes to RespondeHeader as well to prevent incorrect future usage of the size() method over there that would cause similar performance problems.
   
   3. Added JavaDocs primarily meant for contributors to ensure that they use the correct intended method.
   
   4. In this refactor, the size(ObjectSerializationCache) method calculates the size every time (instead of using the cached value). This should remove the double equality checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032516200


##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -110,7 +110,7 @@ object RequestChannel extends Logging {
 
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    def sizeInBytes: Int = header.size(new ObjectSerializationCache) + sizeOfBodyInBytes
+    def sizeInBytes: Int = header.size + sizeOfBodyInBytes

Review Comment:
   Note for reviewers: 
   This is the main change in this PR which impacts performance where we are using the cached value instead of calculating it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1323654501

   @clolov please review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1352674511

   Out of curiosity, what profiler was used to compute this? Also, did we see an actual improvement (eg was cpu usage or latency lower after the change)? I ask because profilers are known to have safepoint bias and can incorrectly attribute the cost when it comes to certain method types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1352933410

   Hey @ijuma - thank you for your question.
   
   1. I used [Amazon CodeGuru Profiler](https://docs.aws.amazon.com/codeguru/latest/profiler-ug/what-is-codeguru-profiler.html). I ran the test again with with [async-profiler](https://github.com/jvm-profiling-tools/async-profiler) which should not suffer from safepoint bias. Please find the below image for the usage of the function. It demonstrates that HashCode evaluation is not the major component but calculating the size takes quite some time. After this change, the flamegraph does not have this stack frame at all (since we don't parse the header again). The flamegraphs is in html format so I can't add it here but will be happy to share with you offline in case you are interested. I am running this with Correto JDK 11 and `./profiler.sh -d 500 -i 40ms -e cycles` as the profiler configuration.
   
   ![Screenshot 2022-12-15 at 12 16 04](https://user-images.githubusercontent.com/71267/207845907-aee8a6a2-ce06-4203-9acc-55e6a7272aee.png)
   
   2. Irrespective of the CPU usage optimisation that this change brings, I think that this is a good change because it avoids parsing the request header twice.
   
   3. The actual CPU standard deviation is more than 2-3% during the duration of the test and hence, I cannot reliably say that I observed lower CPU usage after this change. Latency measurement is also not reliable because the bottleneck is thread contention on partition lock in UnifiedLog and hence, this change won't change the latency.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1326424695

   @mimaison please take a look when you get a chance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032518261


##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -122,11 +158,12 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         RequestHeader that = (RequestHeader) o;
-        return this.data.equals(that.data);
+        return headerVersion == that.headerVersion &&
+            Objects.equals(data, that.data);
     }
 
     @Override
     public int hashCode() {
-        return this.data.hashCode();
+        return Objects.hash(data, headerVersion);

Review Comment:
   Piggybacking this minor change where we now include headerVersion in equality comparison for two RequestHeader objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1353303765

   Was this profile taken after the broker was running for long enough for the JIT compilation to have completed? The profile seems to show some things that look a bit odd.
   
   That said, I think the change is fine overall. It makes sense to avoid the map allocation (including underlying array) and the overhead of mutating it (including array resizes required for it). As you said, it also makes sense to avoid parsing the request header twice.
   
   My questions were mostly so that we understand what we are trying to achieve and we have the right understanding of the underlying reason. This helps ensure future changes are not based on incorrect conclusions.
   
   Thanks for the improvement!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1353646247

   @divijvaidya Thanks. Can you please attach the html file? I think it's possible to do it here, but if not then the JIRA would be helpful. I can take a closer look on the bits that seemed a bit suspicious.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032261055


##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -75,7 +77,17 @@ public void write(ByteBuffer buffer, ObjectSerializationCache serializationCache
     }
 
     public int size(ObjectSerializationCache serializationCache) {
-        return data.size(serializationCache, headerVersion);
+        if (this.size == SIZE_NOT_INITIALIZED) {

Review Comment:
   I think this method does not need to be public anymore, callers from other packages use the new overload. 
   Also on the golden path `size() -> size(ObjectSerializationCache serializationCache)` we do the equality check twice. If we inline it, does it make a difference? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12890:
URL: https://github.com/apache/kafka/pull/12890#issuecomment-1352675158

   More details on identity hash code here: https://shipilev.net/jvm/anatomy-quarks/26-identity-hash-code/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032514676


##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -75,7 +77,17 @@ public void write(ByteBuffer buffer, ObjectSerializationCache serializationCache
     }
 
     public int size(ObjectSerializationCache serializationCache) {
-        return data.size(serializationCache, headerVersion);
+        if (this.size == SIZE_NOT_INITIALIZED) {

Review Comment:
   1. I changed the access modifier for this method to default. It is used by tests hence, I am currently not changing to private.
   
   2. I have made similar changes to RespondeHeader as well to prevent incorrect future usage of the size() method over there that would cause similar performance problems.
   
   3. Added JavaDocs primarily meant for contributors to ensure that they use the correct intended method.
   
   4. In this refactor, the size(ObjectSerializationCache) method calculates the size every time (instead of using the cached value). This should remove the double equality checks.
   
   5. Also added some unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12890: KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12890:
URL: https://github.com/apache/kafka/pull/12890#discussion_r1032518261


##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -122,11 +158,12 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         RequestHeader that = (RequestHeader) o;
-        return this.data.equals(that.data);
+        return headerVersion == that.headerVersion &&
+            Objects.equals(data, that.data);
     }
 
     @Override
     public int hashCode() {
-        return this.data.hashCode();
+        return Objects.hash(data, headerVersion);

Review Comment:
   Edit: Note for reviewers
   
   Piggybacking this minor change where we now include headerVersion in equality comparison for two RequestHeader objects.



##########
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java:
##########
@@ -122,11 +158,12 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         RequestHeader that = (RequestHeader) o;
-        return this.data.equals(that.data);
+        return headerVersion == that.headerVersion &&
+            Objects.equals(data, that.data);
     }
 
     @Override
     public int hashCode() {
-        return this.data.hashCode();
+        return Objects.hash(data, headerVersion);

Review Comment:
   Note for reviewers
   
   Piggybacking this minor change where we now include headerVersion in equality comparison for two RequestHeader objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org