You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/02/16 15:57:02 UTC

[kafka] branch trunk updated: MINOR: remove duplicate code of serializing auto-generated data (#10128)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe3f2be  MINOR: remove duplicate code of serializing auto-generated data (#10128)
fe3f2be is described below

commit fe3f2bec8e46b22650a4ee2552379d2abbaa6f40
Author: dengziming <sw...@163.com>
AuthorDate: Tue Feb 16 23:55:20 2021 +0800

    MINOR: remove duplicate code of serializing auto-generated data (#10128)
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 .../kafka/common/message/RecordsSerdeTest.java     | 14 ++----------
 .../common/message/SimpleExampleMessageTest.java   | 25 +++++-----------------
 2 files changed, 7 insertions(+), 32 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
index dcf6e0f..739bed9 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.SimpleRecord;
@@ -69,7 +69,7 @@ public class RecordsSerdeTest {
     }
 
     private void testRoundTrip(SimpleRecordsMessageData message, short version) {
-        ByteBuffer buf = serialize(message, version);
+        ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
         SimpleRecordsMessageData message2 = deserialize(buf.duplicate(), version);
         assertEquals(message, message2);
         assertEquals(message.hashCode(), message2.hashCode());
@@ -80,14 +80,4 @@ public class RecordsSerdeTest {
         return new SimpleRecordsMessageData(readable, version);
     }
 
-    private ByteBuffer serialize(SimpleRecordsMessageData message, short version) {
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        int totalMessageSize = message.size(cache, version);
-        ByteBuffer buffer = ByteBuffer.allocate(totalMessageSize);
-        ByteBufferAccessor writer = new ByteBufferAccessor(buffer);
-        message.write(writer, cache, version);
-        buffer.flip();
-        return buffer;
-    }
-
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 5b5f33d..1cdafcd 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.junit.jupiter.api.Test;
@@ -79,10 +80,7 @@ public class SimpleExampleMessageTest {
         out.setProcessId(uuid);
         out.setZeroCopyByteBuffer(buf);
 
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        final ByteBuffer buffer = ByteBuffer.allocate(out.size(cache, (short) 1));
-        out.write(new ByteBufferAccessor(buffer), cache, (short) 1);
-        buffer.rewind();
+        final ByteBuffer buffer = MessageUtil.toByteBuffer(out, (short) 1);
 
         final SimpleExampleMessageData in = new SimpleExampleMessageData();
         in.read(new ByteBufferAccessor(buffer), (short) 1);
@@ -104,10 +102,7 @@ public class SimpleExampleMessageTest {
         out.setZeroCopyByteBuffer(buf1);
         out.setNullableZeroCopyByteBuffer(buf2);
 
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        final ByteBuffer buffer = ByteBuffer.allocate(out.size(cache, (short) 1));
-        out.write(new ByteBufferAccessor(buffer), cache, (short) 1);
-        buffer.rewind();
+        final ByteBuffer buffer = MessageUtil.toByteBuffer(out, (short) 1);
 
         final SimpleExampleMessageData in = new SimpleExampleMessageData();
         in.read(new ByteBufferAccessor(buffer), (short) 1);
@@ -248,7 +243,7 @@ public class SimpleExampleMessageTest {
 
         testRoundTrip(new SimpleExampleMessageData().
                 setMyString("blah").
-                setMyTaggedIntArray(Arrays.asList(4)).
+                setMyTaggedIntArray(Collections.singletonList(4)).
                 setTaggedLong(0x123443211234432L),
             message -> assertEquals(0x123443211234432L,
                 message.taggedLong()));
@@ -306,16 +301,6 @@ public class SimpleExampleMessageTest {
         testRoundTrip(message, (short) 2);
     }
 
-    private ByteBuffer serialize(SimpleExampleMessageData message, short version) {
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        int size = message.size(cache, version);
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        message.write(new ByteBufferAccessor(buf), cache, version);
-        buf.flip();
-        assertEquals(size, buf.remaining());
-        return buf;
-    }
-
     private SimpleExampleMessageData deserialize(ByteBuffer buf, short version) {
         SimpleExampleMessageData message = new SimpleExampleMessageData();
         message.read(new ByteBufferAccessor(buf.duplicate()), version);
@@ -335,7 +320,7 @@ public class SimpleExampleMessageTest {
                                Consumer<SimpleExampleMessageData> validator,
                                short version) {
         validator.accept(message);
-        ByteBuffer buf = serialize(message, version);
+        ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
 
         SimpleExampleMessageData message2 = deserialize(buf.duplicate(), version);
         validator.accept(message2);