You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/02/16 15:57:02 UTC
[kafka] branch trunk updated: MINOR: remove duplicate code of
serializing auto-generated data (#10128)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe3f2be MINOR: remove duplicate code of serializing auto-generated data (#10128)
fe3f2be is described below
commit fe3f2bec8e46b22650a4ee2552379d2abbaa6f40
Author: dengziming <sw...@163.com>
AuthorDate: Tue Feb 16 23:55:20 2021 +0800
MINOR: remove duplicate code of serializing auto-generated data (#10128)
Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
.../kafka/common/message/RecordsSerdeTest.java | 14 ++----------
.../common/message/SimpleExampleMessageTest.java | 25 +++++-----------------
2 files changed, 7 insertions(+), 32 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
index dcf6e0f..739bed9 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
@@ -17,7 +17,7 @@
package org.apache.kafka.common.message;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
@@ -69,7 +69,7 @@ public class RecordsSerdeTest {
}
private void testRoundTrip(SimpleRecordsMessageData message, short version) {
- ByteBuffer buf = serialize(message, version);
+ ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
SimpleRecordsMessageData message2 = deserialize(buf.duplicate(), version);
assertEquals(message, message2);
assertEquals(message.hashCode(), message2.hashCode());
@@ -80,14 +80,4 @@ public class RecordsSerdeTest {
return new SimpleRecordsMessageData(readable, version);
}
- private ByteBuffer serialize(SimpleRecordsMessageData message, short version) {
- ObjectSerializationCache cache = new ObjectSerializationCache();
- int totalMessageSize = message.size(cache, version);
- ByteBuffer buffer = ByteBuffer.allocate(totalMessageSize);
- ByteBufferAccessor writer = new ByteBufferAccessor(buffer);
- message.write(writer, cache, version);
- buffer.flip();
- return buffer;
- }
-
}
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 5b5f33d..1cdafcd 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ByteUtils;
import org.junit.jupiter.api.Test;
@@ -79,10 +80,7 @@ public class SimpleExampleMessageTest {
out.setProcessId(uuid);
out.setZeroCopyByteBuffer(buf);
- ObjectSerializationCache cache = new ObjectSerializationCache();
- final ByteBuffer buffer = ByteBuffer.allocate(out.size(cache, (short) 1));
- out.write(new ByteBufferAccessor(buffer), cache, (short) 1);
- buffer.rewind();
+ final ByteBuffer buffer = MessageUtil.toByteBuffer(out, (short) 1);
final SimpleExampleMessageData in = new SimpleExampleMessageData();
in.read(new ByteBufferAccessor(buffer), (short) 1);
@@ -104,10 +102,7 @@ public class SimpleExampleMessageTest {
out.setZeroCopyByteBuffer(buf1);
out.setNullableZeroCopyByteBuffer(buf2);
- ObjectSerializationCache cache = new ObjectSerializationCache();
- final ByteBuffer buffer = ByteBuffer.allocate(out.size(cache, (short) 1));
- out.write(new ByteBufferAccessor(buffer), cache, (short) 1);
- buffer.rewind();
+ final ByteBuffer buffer = MessageUtil.toByteBuffer(out, (short) 1);
final SimpleExampleMessageData in = new SimpleExampleMessageData();
in.read(new ByteBufferAccessor(buffer), (short) 1);
@@ -248,7 +243,7 @@ public class SimpleExampleMessageTest {
testRoundTrip(new SimpleExampleMessageData().
setMyString("blah").
- setMyTaggedIntArray(Arrays.asList(4)).
+ setMyTaggedIntArray(Collections.singletonList(4)).
setTaggedLong(0x123443211234432L),
message -> assertEquals(0x123443211234432L,
message.taggedLong()));
@@ -306,16 +301,6 @@ public class SimpleExampleMessageTest {
testRoundTrip(message, (short) 2);
}
- private ByteBuffer serialize(SimpleExampleMessageData message, short version) {
- ObjectSerializationCache cache = new ObjectSerializationCache();
- int size = message.size(cache, version);
- ByteBuffer buf = ByteBuffer.allocate(size);
- message.write(new ByteBufferAccessor(buf), cache, version);
- buf.flip();
- assertEquals(size, buf.remaining());
- return buf;
- }
-
private SimpleExampleMessageData deserialize(ByteBuffer buf, short version) {
SimpleExampleMessageData message = new SimpleExampleMessageData();
message.read(new ByteBufferAccessor(buf.duplicate()), version);
@@ -335,7 +320,7 @@ public class SimpleExampleMessageTest {
Consumer<SimpleExampleMessageData> validator,
short version) {
validator.accept(message);
- ByteBuffer buf = serialize(message, version);
+ ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
SimpleExampleMessageData message2 = deserialize(buf.duplicate(), version);
validator.accept(message2);