You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dpcollins-google (via GitHub)" <gi...@apache.org> on 2023/01/24 20:48:45 UTC

[GitHub] [kafka] dpcollins-google opened a new pull request, #13162: fix: replace an inefficient loop in kafka internals

dpcollins-google opened a new pull request, #13162:
URL: https://github.com/apache/kafka/pull/13162

   Instead use Channels.newChannel to write in larger chunks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13162:
URL: https://github.com/apache/kafka/pull/13162#issuecomment-1585813278

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch)
   If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: replace an inefficient loop in kafka internals [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1375307580


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Sorry for the delay. I was trying to understand how to show the improvement, but we seem to always pass a heap byte buffer. How can I reproduce this improvement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1104017807


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Thanks for the PR. Can you please provide a bit more detail regarding the workload? I assume we're talking about the producer here. If so, can you please share whether compression was used, the compression algorithm (if applicable) and the average message size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "dpcollins-google (via GitHub)" <gi...@apache.org>.
dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1121810375


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   ping @ijuma / @Hangleton , are there any blockers to getting this merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1088967227


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   - Is this preserving the same behaviour i.e. copy in the range `[pos, pos + length]`? 
   - This is mutating the position of the `buffer`, as opposed to the current implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "dpcollins-google (via GitHub)" <gi...@apache.org>.
dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1104589735


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   This is producer code, with no compression (the data is pre-encrypted so it would be useless anyway) and the message size is 1-2 kB



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Thanks for the follow-up. 
   
   1. This new implementation ignores the `length` argument provided to the method if the buffer is not backed by an array. What if `length` does not equal the number of remaining bytes on the buffer?
   
   2. Is there an actual optimization offered by calling `write`? The implementation of direct buffers use a similar index-based iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "dpcollins-google (via GitHub)" <gi...@apache.org>.
dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1089108841


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   1) Yes, per the docs of WritableByteChannel
   
   ```
   Writes a sequence of bytes to this channel from the given buffer.
   An attempt is made to write up to r bytes to the channel, where r is the number of bytes remaining in the buffer, that is, src.remaining(), at the moment this method is invoked.
   
   Suppose that a byte sequence of length n is written, where 0 <= n <= r. This byte sequence will be transferred from the buffer starting at index p, where p is the buffer's position at the moment this method is invoked; the index of the last byte written will be p + n - 1. Upon return the buffer's position will be equal to p + n; its limit will not have changed.
   ```
   
   2) Good point, although I don't think this has an effect on any of the 4 current users (DefaultRecord.writeTo and LegacyRecord.writeTo for writing key and value), I've added a defensive call to asReadOnlyBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1122285463


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Apologies for the delay - would you have any JMH benchmark for this change? E.g. something like in #13312.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Playerharperb commented on pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Playerharperb (via GitHub)" <gi...@apache.org>.
Playerharperb commented on PR #13162:
URL: https://github.com/apache/kafka/pull/13162#issuecomment-1403125466

   Hello my name is playerharperb@gmail.com and I am interested in this job posting on the website 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "dpcollins-google (via GitHub)" <gi...@apache.org>.
dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1091834852


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Sure. In a particular workload, this code path was about 30% of CPU usage in flamegraphs. It is now 2-3% after a local patch.
   
   This hasn't been discussed in dev- its just an attempt to upstream a small performance improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Thanks for the follow-up. 
   
   1. This new implementation ignores the `length` argument provided to the method if the buffer is not backed by an array. What if `length` does not equal the number of remaining bytes on the buffer?
   
   2. Is there an actual optimization offered by calling `write`? The implementation of direct buffers use a similar linear iteration. Do you have data showing performance improvements with this implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "dpcollins-google (via GitHub)" <gi...@apache.org>.
dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090733865


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Per 1): This parameter is always buffer.remaining(), I've cleaned up the call sites and removed this parameter.
   
   Per 2): Yes, its substantial. The reason is WritableByteChannelImpl writes in 8k chunks when feasible, instead of 1 byte chunks https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/2544d2a351eca1a3d62276f969dd2d95e4a4d2b6/jdk/src/share/classes/java/nio/channels/Channels.java#L442
   
   I can't show benchmarks unfortunately to demonstrate this, as they're of a production application and collected using internal tooling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1091027869


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel,
      * @param length The number of bytes to write
      * @throws IOException For any errors writing to the output
      */
-    public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException {
+    public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException {
         if (buffer.hasArray()) {
             out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
         } else {
-            int pos = buffer.position();
-            for (int i = pos; i < length + pos; i++)
-                out.writeByte(buffer.get(i));
+            Channels.newChannel(out).write(buffer);

Review Comment:
   Got it. Thanks. 
   
   Would you have performance gains at a high-level, without sharing details on the application?
   
   Publicly available data showing the expectable gains would back up the PR further. Has this been discussed in the dev mailing list already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org