You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/07 17:54:04 UTC
[kafka] branch trunk updated: KAFKA-12338;
Remove unused `MetadataParser` (#10793)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51665b9 KAFKA-12338; Remove unused `MetadataParser` (#10793)
51665b9 is described below
commit 51665b9f39f0df2f474de21ccc31af0b3b1811ae
Author: dengziming <sw...@163.com>
AuthorDate: Tue Jun 8 01:52:16 2021 +0800
KAFKA-12338; Remove unused `MetadataParser` (#10793)
`MetadataParser` is a duplication of `MetadataRecordSerde` and it's not used in any code, so we can remove it. It did, however, have some useful validations which have been moved into `MetadataRecordSerde`.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/common/protocol/ByteBufferAccessor.java | 5 +
.../common/protocol/DataInputStreamReadable.java | 9 ++
.../org/apache/kafka/common/protocol/Readable.java | 1 +
.../org/apache/kafka/metadata/MetadataParser.java | 115 ---------------
.../apache/kafka/metadata/MetadataParserTest.java | 155 --------------------
.../kafka/metadata/MetadataRecordSerdeTest.java | 158 ++++++++++++++++++++-
.../serialization/AbstractApiMessageSerde.java | 42 ++++--
.../serialization}/MetadataParseException.java | 11 +-
8 files changed, 213 insertions(+), 283 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
index 3c5c309..bd0925d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
@@ -133,6 +133,11 @@ public class ByteBufferAccessor implements Readable, Writable {
return ByteUtils.readVarlong(buf);
}
+ @Override
+ public int remaining() {
+ return buf.remaining();
+ }
+
public void flip() {
buf.flip();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
index 93c6c59..70ed52d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
@@ -119,6 +119,15 @@ public class DataInputStreamReadable implements Readable, Closeable {
}
@Override
+ public int remaining() {
+ try {
+ return input.available();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public void close() {
try {
input.close();
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 46879cd..9c9e461 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -37,6 +37,7 @@ public interface Readable {
ByteBuffer readByteBuffer(int length);
int readVarint();
long readVarlong();
+ int remaining();
default String readString(int length) {
byte[] arr = new byte[length];
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java
deleted file mode 100644
index d172dc7..0000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata;
-
-import org.apache.kafka.common.metadata.MetadataRecordType;
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
-import org.apache.kafka.common.utils.ByteUtils;
-
-import java.nio.ByteBuffer;
-
-public class MetadataParser {
- public static final int MAX_SERIALIZED_EVENT_SIZE = 32 * 1024 * 1024;
-
- private static short unsignedIntToShort(int val, String entity) {
- if (val > Short.MAX_VALUE) {
- throw new MetadataParseException("Value for " + entity + " was too large.");
- }
- return (short) val;
- }
-
- /**
- * Parse the given buffer.
- *
- * @param buffer The buffer. Its offsets will be modified.
- *
- * @return The metadata message.
- */
- public static ApiMessage read(ByteBuffer buffer) {
- short type;
- try {
- type = unsignedIntToShort(ByteUtils.readUnsignedVarint(buffer), "type");
- } catch (Exception e) {
- throw new MetadataParseException("Failed to read variable-length type " +
- "number: " + e.getClass().getSimpleName() + ": " + e.getMessage());
- }
- short version;
- try {
- version = unsignedIntToShort(ByteUtils.readUnsignedVarint(buffer), "version");
- } catch (Exception e) {
- throw new MetadataParseException("Failed to read variable-length " +
- "version number: " + e.getClass().getSimpleName() + ": " + e.getMessage());
- }
- MetadataRecordType recordType = MetadataRecordType.fromId(type);
- ApiMessage message = recordType.newMetadataRecord();
- try {
- message.read(new ByteBufferAccessor(buffer), version);
- } catch (Exception e) {
- throw new MetadataParseException(recordType + "#parse failed: " +
- e.getClass().getSimpleName() + ": " + e.getMessage());
- }
- if (buffer.hasRemaining()) {
- throw new MetadataParseException("Found " + buffer.remaining() +
- " byte(s) of garbage after " + recordType);
- }
- return message;
- }
-
- /**
- * Find the size of an API message and set up its ObjectSerializationCache.
- *
- * @param message The metadata message.
- * @param version The metadata message version.
- * @param cache The object serialization cache to use.
- *
- * @return The size
- */
- public static int size(ApiMessage message,
- short version,
- ObjectSerializationCache cache) {
- long messageSize = message.size(cache, version);
- long totalSize = messageSize +
- ByteUtils.sizeOfUnsignedVarint(message.apiKey()) +
- ByteUtils.sizeOfUnsignedVarint(version);
- if (totalSize > MAX_SERIALIZED_EVENT_SIZE) {
- throw new RuntimeException("Event size would be " + totalSize + ", but the " +
- "maximum serialized event size is " + MAX_SERIALIZED_EVENT_SIZE);
- }
- return (int) totalSize;
- }
-
- /**
- * Convert the given metadata message into a ByteBuffer.
- *
- * @param message The metadata message.
- * @param version The metadata message version.
- * @param cache The object serialization cache to use. This must have been
- * initialized by calling size() previously.
- * @param buf The buffer to write to.
- */
- public static void write(ApiMessage message,
- short version,
- ObjectSerializationCache cache,
- ByteBuffer buf) {
- ByteUtils.writeUnsignedVarint(message.apiKey(), buf);
- ByteUtils.writeUnsignedVarint(version, buf);
- message.write(new ByteBufferAccessor(buf), cache, version);
- }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
deleted file mode 100644
index 41e968c..0000000
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata;
-
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Timeout(value = 40)
-public class MetadataParserTest {
- private static final Logger log =
- LoggerFactory.getLogger(MetadataParserTest.class);
-
- /**
- * Test some serialization / deserialization round trips.
- */
- @Test
- public void testRoundTrips() {
- testRoundTrip(new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2), (short) 0);
- testRoundTrip(new ConfigRecord().setName("my.config.value").
- setResourceName("foo").setResourceType((byte) 0).setValue("bar"), (short) 0);
- }
-
- private static void testRoundTrip(ApiMessage message, short version) {
- ObjectSerializationCache cache = new ObjectSerializationCache();
- int size = MetadataParser.size(message, version, cache);
- ByteBuffer buffer = ByteBuffer.allocate(size);
- MetadataParser.write(message, version, cache, buffer);
- buffer.flip();
- ApiMessage message2 = MetadataParser.read(buffer.duplicate());
- assertEquals(message, message2);
- assertEquals(message2, message);
-
- ObjectSerializationCache cache2 = new ObjectSerializationCache();
- int size2 = MetadataParser.size(message2, version, cache2);
- assertEquals(size, size2);
- ByteBuffer buffer2 = ByteBuffer.allocate(size);
- MetadataParser.write(message2, version, cache2, buffer2);
- buffer2.flip();
- assertEquals(buffer.duplicate(), buffer2.duplicate());
- }
-
- /**
- * Test attempting to serialize a message which is too big to be serialized.
- */
- @Test
- public void testMaxSerializedEventSizeCheck() {
- List<Integer> longReplicaList =
- new ArrayList<>(MetadataParser.MAX_SERIALIZED_EVENT_SIZE / Integer.BYTES);
- for (int i = 0; i < MetadataParser.MAX_SERIALIZED_EVENT_SIZE / Integer.BYTES; i++) {
- longReplicaList.add(i);
- }
- PartitionRecord partitionRecord = new PartitionRecord().
- setReplicas(longReplicaList);
- ObjectSerializationCache cache = new ObjectSerializationCache();
- assertEquals("Event size would be 33554482, but the maximum serialized event " +
- "size is 33554432", assertThrows(RuntimeException.class, () -> {
- MetadataParser.size(partitionRecord, (short) 0, cache);
- }).getMessage());
- }
-
- /**
- * Test attemping to parse an event which has a malformed message type varint.
- */
- @Test
- public void testParsingMalformedMessageTypeVarint() {
- ByteBuffer buffer = ByteBuffer.allocate(64);
- buffer.clear();
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.position(0);
- buffer.limit(64);
- assertStartsWith("Failed to read variable-length type number",
- assertThrows(MetadataParseException.class, () -> {
- MetadataParser.read(buffer);
- }).getMessage());
- }
-
- /**
- * Test attemping to parse an event which has a malformed message version varint.
- */
- @Test
- public void testParsingMalformedMessageVersionVarint() {
- ByteBuffer buffer = ByteBuffer.allocate(64);
- buffer.clear();
- buffer.put((byte) 0x00);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.put((byte) 0x80);
- buffer.position(0);
- buffer.limit(64);
- assertStartsWith("Failed to read variable-length version number",
- assertThrows(MetadataParseException.class, () -> {
- MetadataParser.read(buffer);
- }).getMessage());
- }
-
- /**
- * Test attemping to parse an event which has a malformed message version varint.
- */
- @Test
- public void testParsingRecordWithGarbageAtEnd() {
- RegisterBrokerRecord message = new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
- ObjectSerializationCache cache = new ObjectSerializationCache();
- int size = MetadataParser.size(message, (short) 0, cache);
- ByteBuffer buffer = ByteBuffer.allocate(size + 1);
- MetadataParser.write(message, (short) 0, cache, buffer);
- buffer.clear();
- assertStartsWith("Found 1 byte(s) of garbage after",
- assertThrows(MetadataParseException.class, () -> {
- MetadataParser.read(buffer);
- }).getMessage());
- }
-
- private static void assertStartsWith(String prefix, String str) {
- assertTrue(str.startsWith(prefix),
- "Expected string '" + str + "' to start with '" + prefix + "'");
- }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 77906e7..3a25b8d 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -17,18 +17,20 @@
package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.serialization.MetadataParseException;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class MetadataRecordSerdeTest {
@@ -65,8 +67,158 @@ class MetadataRecordSerdeTest {
buffer.flip();
MetadataRecordSerde serde = new MetadataRecordSerde();
- assertThrows(SerializationException.class,
- () -> serde.read(new ByteBufferAccessor(buffer), 16));
+ assertStartsWith("Could not deserialize metadata record due to unknown frame version",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), 16)).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a malformed frame version type varint.
+ */
+ @Test
+ public void testParsingMalformedFrameVersionVarint() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.clear();
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.position(0);
+ buffer.limit(64);
+ assertStartsWith("Error while reading frame version",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a malformed message type varint.
+ */
+ @Test
+ public void testParsingMalformedMessageTypeVarint() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.clear();
+ buffer.put((byte) 0x00);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.position(0);
+ buffer.limit(64);
+ assertStartsWith("Error while reading type",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a malformed message version varint.
+ */
+ @Test
+ public void testParsingMalformedMessageVersionVarint() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.clear();
+ buffer.put((byte) 0x00);
+ buffer.put((byte) 0x08);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.put((byte) 0x80);
+ buffer.position(0);
+ buffer.limit(64);
+ assertStartsWith("Error while reading version",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a version > Short.MAX_VALUE
+ */
+ @Test
+ public void testParsingVersionTooLarge() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.clear();
+ buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0x08); // apiKey
+ buffer.put((byte) 0xff); // api version
+ buffer.put((byte) 0xff); // api version
+ buffer.put((byte) 0xff); // api version
+ buffer.put((byte) 0x7f); // api version end
+ buffer.put((byte) 0x80);
+ buffer.position(0);
+ buffer.limit(64);
+ assertStartsWith("Value for version was too large",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a unsupported version
+ */
+ @Test
+ public void testParsingUnsupportedApiKey() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0xff); // apiKey
+ buffer.put((byte) 0x7f); // apiKey
+ buffer.put((byte) 0x00); // api version
+ buffer.put((byte) 0x80);
+ buffer.position(0);
+ buffer.limit(64);
+ assertStartsWith("Unknown metadata id ",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a malformed message body.
+ */
+ @Test
+ public void testParsingMalformedMessage() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+ buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0x00); // apiKey
+ buffer.put((byte) 0x00); // apiVersion
+ buffer.put((byte) 0x80); // malformed data
+ buffer.position(0);
+ buffer.limit(4);
+ assertStartsWith("Failed to deserialize record with type",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+ }
+
+ /**
+ * Test attempting to parse an event which has a malformed message version varint.
+ */
+ @Test
+ public void testParsingRecordWithGarbageAtEnd() {
+ MetadataRecordSerde serde = new MetadataRecordSerde();
+ RegisterBrokerRecord message = new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
+
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(message, (short) 0);
+ int size = serde.recordSize(messageAndVersion, cache);
+ ByteBuffer buffer = ByteBuffer.allocate(size + 1);
+
+ serde.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
+ buffer.clear();
+ assertStartsWith("Found 1 byte(s) of garbage after",
+ assertThrows(MetadataParseException.class,
+ () -> serde.read(new ByteBufferAccessor(buffer), size + 1)).getMessage());
+ }
+
+ private static void assertStartsWith(String prefix, String str) {
+ assertTrue(str.startsWith(prefix),
+ "Expected string '" + str + "' to start with '" + prefix + "'");
}
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
index 67c067d..d292e39 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.server.common.serialization;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Readable;
@@ -46,6 +45,19 @@ public abstract class AbstractApiMessageSerde implements RecordSerde<ApiMessageA
private static final short DEFAULT_FRAME_VERSION = 0;
private static final int DEFAULT_FRAME_VERSION_SIZE = ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
+ private static short unsignedIntToShort(Readable input, String entity) {
+ int val;
+ try {
+ val = input.readUnsignedVarint();
+ } catch (Exception e) {
+ throw new MetadataParseException("Error while reading " + entity, e);
+ }
+ if (val > Short.MAX_VALUE) {
+ throw new MetadataParseException("Value for " + entity + " was too large.");
+ }
+ return (short) val;
+ }
+
@Override
public int recordSize(ApiMessageAndVersion data,
ObjectSerializationCache serializationCache) {
@@ -69,16 +81,30 @@ public abstract class AbstractApiMessageSerde implements RecordSerde<ApiMessageA
@Override
public ApiMessageAndVersion read(Readable input,
int size) {
- short frameVersion = (short) input.readUnsignedVarint();
+ short frameVersion = unsignedIntToShort(input, "frame version");
+
if (frameVersion != DEFAULT_FRAME_VERSION) {
- throw new SerializationException("Could not deserialize metadata record due to unknown frame version "
- + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
+ throw new MetadataParseException("Could not deserialize metadata record due to unknown frame version "
+ + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
}
+ short apiKey = unsignedIntToShort(input, "type");
+ short version = unsignedIntToShort(input, "version");
- short apiKey = (short) input.readUnsignedVarint();
- short version = (short) input.readUnsignedVarint();
- ApiMessage record = apiMessageFor(apiKey);
- record.read(input, version);
+ ApiMessage record;
+ try {
+ record = apiMessageFor(apiKey);
+ } catch (Exception e) {
+ throw new MetadataParseException(e);
+ }
+ try {
+ record.read(input, version);
+ } catch (Exception e) {
+ throw new MetadataParseException("Failed to deserialize record with type " + apiKey, e);
+ }
+ if (input.remaining() > 0) {
+ throw new MetadataParseException("Found " + input.remaining() +
+ " byte(s) of garbage after " + apiKey);
+ }
return new ApiMessageAndVersion(record, version);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
similarity index 81%
rename from metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java
rename to server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
index 1c5d461..49eea2d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
@@ -14,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.kafka.metadata;
+package org.apache.kafka.server.common.serialization;
/**
* An exception indicating that we failed to parse a metadata entry.
@@ -26,4 +25,12 @@ public class MetadataParseException extends RuntimeException {
public MetadataParseException(String message) {
super(message);
}
+
+ public MetadataParseException(Throwable e) {
+ super(e);
+ }
+
+ public MetadataParseException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}