You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 15:46:30 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #7409: MINOR: Skip conversion to `Struct` when serializing generated requests/responses

chia7712 commented on a change in pull request #7409:
URL: https://github.com/apache/kafka/pull/7409#discussion_r536149423



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -95,21 +93,33 @@ public short version() {
         return version;
     }
 
-    public Send toSend(String destination, RequestHeader header) {
-        return new NetworkSend(destination, serialize(header));
+    public ApiKeys apiKey() {
+        return apiKey;
     }
 
-    /**
-     * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
-     */
-    public ByteBuffer serialize(RequestHeader header) {
-        return RequestUtils.serialize(header.toStruct(), toStruct());
+    public final Send toSend(String destination, RequestHeader header) {
+        return SendBuilder.buildRequestSend(destination, header, data());
+    }
+
+    // Visible for testing
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {
+        return RequestUtils.serialize(header.data(), header.headerVersion(), data(), version);
+    }
+
+    protected abstract Message data();
+
+    // Visible for testing
+    public final ByteBuffer serializeBody() {
+        return RequestUtils.serialize(null, (short) 0, data(), version);
     }
 
-    protected abstract Struct toStruct();
+    // Visible for testing
+    final int sizeInBytes() {
+        return data().size(new ObjectSerializationCache(), version);
+    }
 
     public String toString(boolean verbose) {
-        return toStruct().toString();
+        return data().toString();

Review comment:
       Is this toString variety still useful?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java
##########
@@ -209,16 +210,13 @@ public static Send buildResponseSend(
 
     private static Send buildSend(
         String destination,
-        ApiMessage header,
+        Message header,
         short headerVersion,
-        ApiMessage apiMessage,
+        Message apiMessage,
         short apiVersion
     ) {
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();

Review comment:
       not sure whether we should reduce the initial capacity of ObjectSerializationCache. Not all request/response uses full cache.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -95,21 +93,33 @@ public short version() {
         return version;
     }
 
-    public Send toSend(String destination, RequestHeader header) {
-        return new NetworkSend(destination, serialize(header));
+    public ApiKeys apiKey() {
+        return apiKey;
     }
 
-    /**
-     * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
-     */
-    public ByteBuffer serialize(RequestHeader header) {
-        return RequestUtils.serialize(header.toStruct(), toStruct());
+    public final Send toSend(String destination, RequestHeader header) {
+        return SendBuilder.buildRequestSend(destination, header, data());
+    }
+
+    // Visible for testing
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {

Review comment:
       How about moving this method to test code? ```MessageTestUtil``` could be a good place. 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
##########
@@ -90,4 +75,37 @@ public static boolean hasTransactionalRecords(ProduceRequest request) {
         }
         return new AbstractMap.SimpleEntry<>(hasIdempotentRecords, hasTransactionalRecords);
     }
+
+    public static MessageSizeAccumulator size(
+        ObjectSerializationCache serializationCache,
+        Message header,
+        short headerVersion,
+        Message apiMessage,
+        short apiVersion
+    ) {
+        MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
+        if (header != null)
+            header.addSize(messageSize, serializationCache, headerVersion);
+        apiMessage.addSize(messageSize, serializationCache, apiVersion);
+        return messageSize;
+    }
+
+    public static ByteBuffer serialize(
+        Message header,
+        short headerVersion,
+        Message apiMessage,
+        short apiVersion
+    ) {
+        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
+        MessageSizeAccumulator messageSize = RequestUtils.size(serializationCache, header, headerVersion, apiMessage, apiVersion);
+
+        ByteBuffer buffer = ByteBuffer.allocate(messageSize.totalSize());

Review comment:
       Should it subtract ```messageSize.zeroCopySize()```?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
##########
@@ -96,8 +92,8 @@ public boolean isValid() {
     }
 
     @Override
-    protected Struct toStruct() {
-        return data.toStruct(version());
+    public ApiVersionsRequestData data() {

Review comment:
       ```public``` is for testing?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -81,14 +70,23 @@ public RequestHeaderData data() {
         return data;
     }
 
+    public void write(ByteBuffer buffer, ObjectSerializationCache serializationCache) {

Review comment:
       This is used by testing only. Maybe we can move it to test scope.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -95,21 +93,33 @@ public short version() {
         return version;
     }
 
-    public Send toSend(String destination, RequestHeader header) {
-        return new NetworkSend(destination, serialize(header));
+    public ApiKeys apiKey() {
+        return apiKey;
     }
 
-    /**
-     * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
-     */
-    public ByteBuffer serialize(RequestHeader header) {
-        return RequestUtils.serialize(header.toStruct(), toStruct());
+    public final Send toSend(String destination, RequestHeader header) {
+        return SendBuilder.buildRequestSend(destination, header, data());
+    }
+
+    // Visible for testing
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {

Review comment:
       ditto for following "Visible for testing" code

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -95,21 +93,33 @@ public short version() {
         return version;
     }
 
-    public Send toSend(String destination, RequestHeader header) {
-        return new NetworkSend(destination, serialize(header));
+    public ApiKeys apiKey() {
+        return apiKey;
     }
 
-    /**
-     * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
-     */
-    public ByteBuffer serialize(RequestHeader header) {
-        return RequestUtils.serialize(header.toStruct(), toStruct());
+    public final Send toSend(String destination, RequestHeader header) {
+        return SendBuilder.buildRequestSend(destination, header, data());
+    }
+
+    // Visible for testing
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {

Review comment:
       BTW, this method is duplicate to TestUtils#serializeRequestHeader




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org