You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/21 12:45:04 UTC

[GitHub] [kafka] LinShunKang opened a new pull request, #12545: Perf: Reduce Fetcher#parseRecord() memory copy

LinShunKang opened a new pull request, #12545:
URL: https://github.com/apache/kafka/pull/12545

   # Motivation
   * Reduce `Fetcher#parseRecord()` memory copy: we can direct use ByteBuffer instead of byte[] to deserialize.
   
   # Proposed Change
   * `Deserializer` add default method `deserialize(String, Headers, ByteBuffer)`;
   *  Invoke `Deserializer#deserialize(String, Headers, ByteBuffer)` instead of `Deserializer#deserialize(String, Headers, byte[])`  in `Fetcher#parseRecord(TopicPartition, RecordBatch, Record)`.
   
   # Changed Public Interfaces
   * Deserializer
   * StringDeserializer
   * ByteBufferDeserializer
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r977892155


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+

Review Comment:
   the extra line :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1505060095

   I will take a look this week. Sorry for delay


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174317140


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > I use ByteBuffer.getDouble(int) instead of ByteBuffer.getDouble(), considering that ByteBuffer.getDouble(int) does not modify the offsets of the ByteBuffer.
   
   I get it. We'll need the position unchanged to get the buffer size (buffer.remaining()). Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1006246642


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -263,6 +264,16 @@ public String deserialize(String topic, byte[] data) {
                     return super.deserialize(topic, data);
                 }
             }
+
+            @Override
+            public String deserialize(String topic, Headers headers, ByteBuffer data) {
+                if (i == recordIndex) {
+                    throw new SerializationException();
+                } else {

Review Comment:
   you actually don't need the else there, because it will just throw.  It would be a bit cleaner if you do:
   ```
   if (i == recordIndex) {
     throw new SerializationException();
   }
   
   i++;
   return super.deserialize(topic, headers, data);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170898685


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   @showuon 
   Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], and the current byte order of ByteBuffer may not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be consistent with the byte order used when reading from byte[].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174314208


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang 
   > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], and the current byte order of ByteBuffer may not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be consistent with the byte order used when reading from byte[].
   
   Again, from the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight.
   
   The byte order is already considered while reading from the byte buffer. That means, this is not true:
   > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[]
   
   It will read from byte[] using the current byte order (BIG_ENDIAN or LITTLE_ENDIAN). Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174332358


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > @LinShunKang
   > 
   > > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], and the current byte order of ByteBuffer may not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be consistent with the byte order used when reading from byte[].
   > 
   > Again, from the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   > 
   > > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight.
   > 
   > The byte order is already considered while reading from the byte buffer. That means, this is not true:
   > 
   > > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[]
   > 
   > It will read from byte[] using the current byte order (BIG_ENDIAN or LITTLE_ENDIAN). Did I miss anything?
   
   In fact, the essence of this issue is which of the following methods we should use for data reading:
   1. Force users to write data into ByteBuffer in BIG_ENDIAN mode, and then we read data from ByteBuffer in BIG_ENDIAN mode;
   2. We read data according to the byte order of the ByteBuffer passed in by the user by default.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170874370


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   Also, the `data.position()` could be removed because by default, it'll read from current position of the byte buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978814376


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+
     private String encoding = StandardCharsets.UTF_8.name();
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+        final String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
         Object encodingValue = configs.get(propertyName);
-        if (encodingValue == null)
+        if (encodingValue == null) {
             encodingValue = configs.get("deserializer.encoding");
-        if (encodingValue instanceof String)
+        }
+
+        if (encodingValue instanceof String) {
             encoding = (String) encodingValue;
+        }
     }
 
     @Override
     public String deserialize(String topic, byte[] data) {
         try {
-            if (data == null)
-                return null;
-            else
-                return new String(data, encoding);
+            return data == null ? null : new String(data, encoding);
         } catch (UnsupportedEncodingException e) {
             throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
         }
     }
+
+    @Override
+    public String deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        try {
+            if (data.hasArray()) {
+                return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
+            } else {

Review Comment:
   Did you mean?
   ```
   try {
       if (data.hasArray()) {
           return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
       } 
       return new String(Utils.toArray(data), encoding);
   } catch (UnsupportedEncodingException e) {
       throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1008118806


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,19 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            final long offset = record.offset();
+            final long timestamp = record.timestamp();
+            final Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
+            final TimestampType timestampType = batch.timestampType();
+            final Headers headers = new RecordHeaders(record.headers());
+            final ByteBuffer keyBytes = record.key();
+            final int keySize = keyBytes == null ? NULL_SIZE : keyBytes.remaining();
+            final K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            final ByteBuffer valueBytes = record.value();
+            final int valueSize = valueBytes == null ? NULL_SIZE : valueBytes.remaining();
+            final V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+                    timestamp, timestampType, keySize, valueSize, key, value, headers, leaderEpoch);

Review Comment:
   Did you mean:
   ```
   return new ConsumerRecord<>(partition.topic(), 
       partition.partition(), 
       offset,
       timestamp, 
       timestampType, 
       keySize, 
       valueSize,
       key,
       value,
       headers,
       leaderEpoch);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1008964206


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,19 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            final long offset = record.offset();
+            final long timestamp = record.timestamp();
+            final Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
+            final TimestampType timestampType = batch.timestampType();
+            final Headers headers = new RecordHeaders(record.headers());
+            final ByteBuffer keyBytes = record.key();
+            final int keySize = keyBytes == null ? NULL_SIZE : keyBytes.remaining();
+            final K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            final ByteBuffer valueBytes = record.value();
+            final int valueSize = valueBytes == null ? NULL_SIZE : valueBytes.remaining();
+            final V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+                    timestamp, timestampType, keySize, valueSize, key, value, headers, leaderEpoch);

Review Comment:
   right, if you think this improves the readability. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1157554656


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,28 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
-            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+            final long offset = record.offset();

Review Comment:
   There are similar changes in some of the other files too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,28 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
-            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+            final long offset = record.offset();

Review Comment:
   We typically don't make local variables `final` in Kafka (apart from Streams), so could we undo these changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174387569


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > Yes, that's because we parse it by ourselves from byte array. But now, we have API provided by jdk (ByteBuffer) to allow us read according to the byte order in the buffer. So, I'm not convinced why we still need to read using `BIG_ENDIAN` order? Do you have other reason you think should stick to `BIG_ENDIAN` order?
   
   No, that's the only reason I chose method 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1521213534

   Failed tests are unrelated:
   ```
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure
       Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174382338


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > Currently, DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], 
   
   Yes, that's because we parse it by ourselves from byte array. But now, we have API provided by jdk (ByteBuffer) to allow us read according to the byte order in the buffer. So, I'm not convinced why we still need to read using `BIG_ENDIAN` order? Do you have other reason you think should stick to `BIG_ENDIAN` order?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524523279

   Thanks for the improvement!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978844169


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+

Review Comment:
   Usually I like to follow [Google's java style guide](https://google.github.io/styleguide/javaguide.html#s4.5-line-wrapping).  In this case, no new line after an opening brace.  Perhaps we can get one of the committer to comment on the styling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1008120362


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -263,6 +264,16 @@ public String deserialize(String topic, byte[] data) {
                     return super.deserialize(topic, data);
                 }
             }
+
+            @Override
+            public String deserialize(String topic, Headers headers, ByteBuffer data) {
+                if (i == recordIndex) {
+                    throw new SerializationException();
+                } else {

Review Comment:
    I just kept the same style with the previous method `public String deserialize(String topic, byte[] data)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978826658


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+

Review Comment:
   I see that some classes have blank line under the class, and some have no blank line, so what is the recommended style of kafka:
   * have blank line: ByteArrayDeserializer, DoubleDeserializer, ListDeserializer;
   * have no blank line: ByteArraySerializer, ByteBufferSerializer, LongDeserializer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1008966693


##########
clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java:
##########
@@ -60,6 +62,17 @@ default T deserialize(String topic, Headers headers, byte[] data) {
         return deserialize(topic, data);
     }
 
+    /**
+     * Deserialize a record value from a ByteBuffer into a value or object.
+     * @param topic topic associated with the data
+     * @param headers headers associated with the record; may be empty.

Review Comment:
   sorry, not super familiar with the Headers class, assume empty is not null? It would be good to clarify here in case if user think empty = null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before reading it? From the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight. 
   
   It looks like the byte order is already considered while read from the byte buffer. Had a quick check the jdk source code, and it did check (and convert) the byte order if needed. Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1157532584


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+

Review Comment:
   We don't really have a rule for this. It's usually best practice to avoid changing a lot of spacing in a PR (if it's not required) to reduce noise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524581695

   Thanks for the reminder! Updated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1006249694


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,19 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            final long offset = record.offset();
+            final long timestamp = record.timestamp();
+            final Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
+            final TimestampType timestampType = batch.timestampType();
+            final Headers headers = new RecordHeaders(record.headers());
+            final ByteBuffer keyBytes = record.key();
+            final int keySize = keyBytes == null ? NULL_SIZE : keyBytes.remaining();
+            final K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            final ByteBuffer valueBytes = record.value();
+            final int valueSize = valueBytes == null ? NULL_SIZE : valueBytes.remaining();
+            final V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+                    timestamp, timestampType, keySize, valueSize, key, value, headers, leaderEpoch);

Review Comment:
   consider breaking the return params into multiple lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1009466849


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,19 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            final long offset = record.offset();
+            final long timestamp = record.timestamp();
+            final Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
+            final TimestampType timestampType = batch.timestampType();
+            final Headers headers = new RecordHeaders(record.headers());
+            final ByteBuffer keyBytes = record.key();
+            final int keySize = keyBytes == null ? NULL_SIZE : keyBytes.remaining();
+            final K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            final ByteBuffer valueBytes = record.value();
+            final int valueSize = valueBytes == null ? NULL_SIZE : valueBytes.remaining();
+            final V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+                    timestamp, timestampType, keySize, valueSize, key, value, headers, leaderEpoch);

Review Comment:
   hmm, it looks better this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r977887202


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+
     private String encoding = StandardCharsets.UTF_8.name();
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+        final String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
         Object encodingValue = configs.get(propertyName);
-        if (encodingValue == null)
+        if (encodingValue == null) {
             encodingValue = configs.get("deserializer.encoding");
-        if (encodingValue instanceof String)
+        }
+
+        if (encodingValue instanceof String) {
             encoding = (String) encodingValue;
+        }
     }
 
     @Override
     public String deserialize(String topic, byte[] data) {
         try {
-            if (data == null)
-                return null;
-            else
-                return new String(data, encoding);
+            return data == null ? null : new String(data, encoding);
         } catch (UnsupportedEncodingException e) {
             throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
         }
     }
+
+    @Override
+    public String deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        try {
+            if (data.hasArray()) {
+                return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
+            } else {

Review Comment:
   you don't really need the else here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1161247649


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1413,22 +1414,28 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                              RecordBatch batch,
                                              Record record) {
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
-            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
+            final long offset = record.offset();

Review Comment:
   I have resolved the conflicts and partially reverted the code formatting changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1496342032

   @guozhangwang @showuon @C0urante You voted on this KIP, can you take a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170903894


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > Also, the `data.position()` could be removed because by default, it'll read from current position of the byte buffer.
   
   @showuon 
   I use ByteBuffer.getDouble(int) instead of ByteBuffer.getDouble(), considering that ByteBuffer.getDouble(int) does not modify the offsets of the ByteBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1511641622

   > LGTM! Thanks for the improvement!
   > 
   > @LinShunKang , I think this improvement can also apply to other types, ex: `UUIDDeserializer` is just a wrapper for `StringDeserializer`. And also, the number deserializer should also be able to be handled by ByteBuffer directly. WDYT?
   
   Thank you for reviewing.
   I already implement `deserialize(String, Headers, ByteBuffer)` for other Deserializers, PTAL : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1297164414

   @showuon @guozhangwang @C0urante 
   PTAL : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978832019


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+
     private String encoding = StandardCharsets.UTF_8.name();
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+        final String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
         Object encodingValue = configs.get(propertyName);
-        if (encodingValue == null)
+        if (encodingValue == null) {
             encodingValue = configs.get("deserializer.encoding");
-        if (encodingValue instanceof String)
+        }
+
+        if (encodingValue instanceof String) {
             encoding = (String) encodingValue;
+        }
     }
 
     @Override
     public String deserialize(String topic, byte[] data) {
         try {
-            if (data == null)
-                return null;
-            else
-                return new String(data, encoding);
+            return data == null ? null : new String(data, encoding);
         } catch (UnsupportedEncodingException e) {
             throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
         }
     }
+
+    @Override
+    public String deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        try {
+            if (data.hasArray()) {
+                return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
+            } else {

Review Comment:
   exactly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174332358


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   > @LinShunKang
   > 
   > > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], and the current byte order of ByteBuffer may not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be consistent with the byte order used when reading from byte[].
   > 
   > Again, from the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   > 
   > > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight.
   > 
   > The byte order is already considered while reading from the byte buffer. That means, this is not true:
   > 
   > > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[]
   > 
   > It will read from byte[] using the current byte order (BIG_ENDIAN or LITTLE_ENDIAN). Did I miss anything?
   
   In fact, the essence of this issue is which of the following ways we should choose for data reading:
   1. Force users to write data into ByteBuffer using BIG_ENDIAN, and then we read data from ByteBuffer in BIG_ENDIAN;
   2. We read data by default according to the byte order of the ByteBuffer provided by the user;
   
   Currently, DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], so I chose method 1.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1518646357

   > LGTM! Thanks for the update, and also improve the boolean deserializer.
   
   Thank you for reviewing : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1501053449

   > Thanks for the PR. The main changes seem to make sense to me but there's quite a few formatting changes that could be reverted to keep the PR clean and focused.
   > 
   > Can you also rebase on trunk to resolve the conflicts?
   
   Thank you for reviewing. I will address the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524475111

   > @LinShunKang , could you create a JIRA issue related to this KIP, and make the PR title started with `KAFKA-xxxx: Reduce...` instead of `KIP-xxx: Reduce...`. Also, the KIP needs to point to that JIRA issue in the `Status` section. You can refer to this [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API#KIP827:ExposelogdirstotalandusablespaceviaKafkaAPI-Status) . Thanks.
   
   I have completed the changes mentioned above, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by GitBox <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1009465243


##########
clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java:
##########
@@ -60,6 +62,17 @@ default T deserialize(String topic, Headers headers, byte[] data) {
         return deserialize(topic, data);
     }
 
+    /**
+     * Deserialize a record value from a ByteBuffer into a value or object.
+     * @param topic topic associated with the data
+     * @param headers headers associated with the record; may be empty.

Review Comment:
   I copied this documentation from `deserialize(String topic, Headers headers, byte[] data)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before reading it? From the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight. 
   
   It looks like the byte order is already considered while read from the byte buffer. Had a quick check the jdk source code, and it did check (and convert) the byte order if needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1174314208


##########
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##########
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
         }
         return Double.longBitsToDouble(value);
     }
+
+    @Override
+    public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.remaining() != 8) {
+            throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
+        }
+
+        final ByteOrder srcOrder = data.order();
+        data.order(BIG_ENDIAN);
+
+        final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang 
   > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[], and the current byte order of ByteBuffer may not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be consistent with the byte order used when reading from byte[].
   
   Again, from the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight.
   
   The byte order is already considered while reading from the byte buffer. That means, this is not true:
   > Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN byte order to read from byte[]
   
   It will read from byte[] using the current byte order. Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1161224453


##########
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##########
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
  */
 public class StringDeserializer implements Deserializer<String> {
+

Review Comment:
   > We don't really have a rule for this. It's usually best practice to avoid changing a lot of spacing in a PR (if it's not required) to reduce noise.
   
   Ok, I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1521216831

   @LinShunKang , could you create a JIRA issue related to this KIP, and make the PR title started with `KAFKA-xxxx: Reduce...` instead of `KIP-xxx: Reduce...`. Also, the KIP needs to point to that JIRA issue in the `Status` section. You can refer to this [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+log+dirs+total+and+usable+space+via+Kafka+API#KIP827:ExposelogdirstotalandusablespaceviaKafkaAPI-Status) . Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1524573080

   @showuon 
   Thanks for your help!
   But, it seems like you've confused JIRA KAFKA-14944 with KAFKA-14945.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #12545: KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #12545:
URL: https://github.com/apache/kafka/pull/12545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org