You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/07 17:54:04 UTC

[kafka] branch trunk updated: KAFKA-12338; Remove unused `MetadataParser` (#10793)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51665b9  KAFKA-12338; Remove unused `MetadataParser` (#10793)
51665b9 is described below

commit 51665b9f39f0df2f474de21ccc31af0b3b1811ae
Author: dengziming <sw...@163.com>
AuthorDate: Tue Jun 8 01:52:16 2021 +0800

    KAFKA-12338; Remove unused `MetadataParser` (#10793)
    
    `MetadataParser` is a duplication of `MetadataRecordSerde` and it's not used in any code, so we can remove it. It did, however, have some useful validations which have been moved into `MetadataRecordSerde`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/common/protocol/ByteBufferAccessor.java  |   5 +
 .../common/protocol/DataInputStreamReadable.java   |   9 ++
 .../org/apache/kafka/common/protocol/Readable.java |   1 +
 .../org/apache/kafka/metadata/MetadataParser.java  | 115 ---------------
 .../apache/kafka/metadata/MetadataParserTest.java  | 155 --------------------
 .../kafka/metadata/MetadataRecordSerdeTest.java    | 158 ++++++++++++++++++++-
 .../serialization/AbstractApiMessageSerde.java     |  42 ++++--
 .../serialization}/MetadataParseException.java     |  11 +-
 8 files changed, 213 insertions(+), 283 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
index 3c5c309..bd0925d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
@@ -133,6 +133,11 @@ public class ByteBufferAccessor implements Readable, Writable {
         return ByteUtils.readVarlong(buf);
     }
 
+    @Override
+    public int remaining() {
+        return buf.remaining();
+    }
+
     public void flip() {
         buf.flip();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
index 93c6c59..70ed52d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
@@ -119,6 +119,15 @@ public class DataInputStreamReadable implements Readable, Closeable {
     }
 
     @Override
+    public int remaining() {
+        try {
+            return input.available();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
     public void close() {
         try {
             input.close();
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 46879cd..9c9e461 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -37,6 +37,7 @@ public interface Readable {
     ByteBuffer readByteBuffer(int length);
     int readVarint();
     long readVarlong();
+    int remaining();
 
     default String readString(int length) {
         byte[] arr = new byte[length];
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java
deleted file mode 100644
index d172dc7..0000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParser.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata;
-
-import org.apache.kafka.common.metadata.MetadataRecordType;
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
-import org.apache.kafka.common.utils.ByteUtils;
-
-import java.nio.ByteBuffer;
-
-public class MetadataParser {
-    public static final int MAX_SERIALIZED_EVENT_SIZE = 32 * 1024 * 1024;
-
-    private static short unsignedIntToShort(int val, String entity) {
-        if (val > Short.MAX_VALUE) {
-            throw new MetadataParseException("Value for " + entity + " was too large.");
-        }
-        return (short) val;
-    }
-
-    /**
-     * Parse the given buffer.
-     *
-     * @param buffer    The buffer.  Its offsets will be modified.
-     *
-     * @return          The metadata message.
-     */
-    public static ApiMessage read(ByteBuffer buffer) {
-        short type;
-        try {
-            type = unsignedIntToShort(ByteUtils.readUnsignedVarint(buffer), "type");
-        } catch (Exception e) {
-            throw new MetadataParseException("Failed to read variable-length type " +
-                "number: " + e.getClass().getSimpleName() + ": " + e.getMessage());
-        }
-        short version;
-        try {
-            version = unsignedIntToShort(ByteUtils.readUnsignedVarint(buffer), "version");
-        } catch (Exception e) {
-            throw new MetadataParseException("Failed to read variable-length " +
-                "version number: " + e.getClass().getSimpleName() + ": " + e.getMessage());
-        }
-        MetadataRecordType recordType = MetadataRecordType.fromId(type);
-        ApiMessage message = recordType.newMetadataRecord();
-        try {
-            message.read(new ByteBufferAccessor(buffer), version);
-        } catch (Exception e) {
-            throw new MetadataParseException(recordType + "#parse failed: " +
-                e.getClass().getSimpleName() + ": " + e.getMessage());
-        }
-        if (buffer.hasRemaining()) {
-            throw new MetadataParseException("Found " + buffer.remaining() +
-                " byte(s) of garbage after " + recordType);
-        }
-        return message;
-    }
-
-    /**
-     * Find the size of an API message and set up its ObjectSerializationCache.
-     *
-     * @param message   The metadata message.
-     * @param version   The metadata message version.
-     * @param cache     The object serialization cache to use.
-     *
-     * @return          The size
-     */
-    public static int size(ApiMessage message,
-                           short version,
-                           ObjectSerializationCache cache) {
-        long messageSize = message.size(cache, version);
-        long totalSize = messageSize +
-            ByteUtils.sizeOfUnsignedVarint(message.apiKey()) +
-            ByteUtils.sizeOfUnsignedVarint(version);
-        if (totalSize > MAX_SERIALIZED_EVENT_SIZE) {
-            throw new RuntimeException("Event size would be " + totalSize + ", but the " +
-                "maximum serialized event size is " + MAX_SERIALIZED_EVENT_SIZE);
-        }
-        return (int) totalSize;
-    }
-
-    /**
-     * Convert the given metadata message into a ByteBuffer.
-     *
-     * @param message   The metadata message.
-     * @param version   The metadata message version.
-     * @param cache     The object serialization cache to use.  This must have been
-     *                  initialized by calling size() previously.
-     * @param buf       The buffer to write to.
-     */
-    public static void write(ApiMessage message,
-                             short version,
-                             ObjectSerializationCache cache,
-                             ByteBuffer buf) {
-        ByteUtils.writeUnsignedVarint(message.apiKey(), buf);
-        ByteUtils.writeUnsignedVarint(version, buf);
-        message.write(new ByteBufferAccessor(buf), cache, version);
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
deleted file mode 100644
index 41e968c..0000000
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata;
-
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.protocol.ObjectSerializationCache;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Timeout(value = 40)
-public class MetadataParserTest {
-    private static final Logger log =
-        LoggerFactory.getLogger(MetadataParserTest.class);
-
-    /**
-     * Test some serialization / deserialization round trips.
-     */
-    @Test
-    public void testRoundTrips() {
-        testRoundTrip(new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2), (short) 0);
-        testRoundTrip(new ConfigRecord().setName("my.config.value").
-            setResourceName("foo").setResourceType((byte) 0).setValue("bar"), (short) 0);
-    }
-
-    private static void testRoundTrip(ApiMessage message, short version) {
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        int size = MetadataParser.size(message, version, cache);
-        ByteBuffer buffer = ByteBuffer.allocate(size);
-        MetadataParser.write(message, version, cache, buffer);
-        buffer.flip();
-        ApiMessage message2 = MetadataParser.read(buffer.duplicate());
-        assertEquals(message, message2);
-        assertEquals(message2, message);
-
-        ObjectSerializationCache cache2 = new ObjectSerializationCache();
-        int size2 = MetadataParser.size(message2, version, cache2);
-        assertEquals(size, size2);
-        ByteBuffer buffer2 = ByteBuffer.allocate(size);
-        MetadataParser.write(message2, version, cache2, buffer2);
-        buffer2.flip();
-        assertEquals(buffer.duplicate(), buffer2.duplicate());
-    }
-
-    /**
-     * Test attempting to serialize a message which is too big to be serialized.
-     */
-    @Test
-    public void testMaxSerializedEventSizeCheck() {
-        List<Integer> longReplicaList =
-            new ArrayList<>(MetadataParser.MAX_SERIALIZED_EVENT_SIZE / Integer.BYTES);
-        for (int i = 0; i < MetadataParser.MAX_SERIALIZED_EVENT_SIZE / Integer.BYTES; i++) {
-            longReplicaList.add(i);
-        }
-        PartitionRecord partitionRecord = new PartitionRecord().
-            setReplicas(longReplicaList);
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        assertEquals("Event size would be 33554482, but the maximum serialized event " +
-            "size is 33554432", assertThrows(RuntimeException.class, () -> {
-                MetadataParser.size(partitionRecord, (short) 0, cache);
-            }).getMessage());
-    }
-
-    /**
-     * Test attemping to parse an event which has a malformed message type varint.
-     */
-    @Test
-    public void testParsingMalformedMessageTypeVarint() {
-        ByteBuffer buffer = ByteBuffer.allocate(64);
-        buffer.clear();
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.position(0);
-        buffer.limit(64);
-        assertStartsWith("Failed to read variable-length type number",
-            assertThrows(MetadataParseException.class, () -> {
-                MetadataParser.read(buffer);
-            }).getMessage());
-    }
-
-    /**
-     * Test attemping to parse an event which has a malformed message version varint.
-     */
-    @Test
-    public void testParsingMalformedMessageVersionVarint() {
-        ByteBuffer buffer = ByteBuffer.allocate(64);
-        buffer.clear();
-        buffer.put((byte) 0x00);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.put((byte) 0x80);
-        buffer.position(0);
-        buffer.limit(64);
-        assertStartsWith("Failed to read variable-length version number",
-            assertThrows(MetadataParseException.class, () -> {
-                MetadataParser.read(buffer);
-            }).getMessage());
-    }
-
-    /**
-     * Test attemping to parse an event which has a malformed message version varint.
-     */
-    @Test
-    public void testParsingRecordWithGarbageAtEnd() {
-        RegisterBrokerRecord message = new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
-        ObjectSerializationCache cache = new ObjectSerializationCache();
-        int size = MetadataParser.size(message, (short) 0, cache);
-        ByteBuffer buffer = ByteBuffer.allocate(size + 1);
-        MetadataParser.write(message, (short) 0, cache, buffer);
-        buffer.clear();
-        assertStartsWith("Found 1 byte(s) of garbage after",
-            assertThrows(MetadataParseException.class, () -> {
-                MetadataParser.read(buffer);
-            }).getMessage());
-    }
-
-    private static void assertStartsWith(String prefix, String str) {
-        assertTrue(str.startsWith(prefix),
-            "Expected string '" + str + "' to start with '" + prefix + "'");
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 77906e7..3a25b8d 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -17,18 +17,20 @@
 package org.apache.kafka.metadata;
 
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.serialization.MetadataParseException;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MetadataRecordSerdeTest {
 
@@ -65,8 +67,158 @@ class MetadataRecordSerdeTest {
         buffer.flip();
 
         MetadataRecordSerde serde = new MetadataRecordSerde();
-        assertThrows(SerializationException.class,
-            () -> serde.read(new ByteBufferAccessor(buffer), 16));
+        assertStartsWith("Could not deserialize metadata record due to unknown frame version",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), 16)).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed frame version type varint.
+     */
+    @Test
+    public void testParsingMalformedFrameVersionVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertStartsWith("Error while reading frame version",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message type varint.
+     */
+    @Test
+    public void testParsingMalformedMessageTypeVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertStartsWith("Error while reading type",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message version varint.
+     */
+    @Test
+    public void testParsingMalformedMessageVersionVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x08);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertStartsWith("Error while reading version",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a version > Short.MAX_VALUE
+     */
+    @Test
+    public void testParsingVersionTooLarge() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0x08); // apiKey
+        buffer.put((byte) 0xff); // api version
+        buffer.put((byte) 0xff); // api version
+        buffer.put((byte) 0xff); // api version
+        buffer.put((byte) 0x7f); // api version end
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertStartsWith("Value for version was too large",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a unsupported version
+     */
+    @Test
+    public void testParsingUnsupportedApiKey() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0xff); // apiKey
+        buffer.put((byte) 0x7f); // apiKey
+        buffer.put((byte) 0x00); // api version
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertStartsWith("Unknown metadata id ",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message body.
+     */
+    @Test
+    public void testParsingMalformedMessage() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0x00); // apiKey
+        buffer.put((byte) 0x00); // apiVersion
+        buffer.put((byte) 0x80); // malformed data
+        buffer.position(0);
+        buffer.limit(4);
+        assertStartsWith("Failed to deserialize record with type",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message version varint.
+     */
+    @Test
+    public void testParsingRecordWithGarbageAtEnd() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        RegisterBrokerRecord message = new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
+
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(message, (short) 0);
+        int size = serde.recordSize(messageAndVersion, cache);
+        ByteBuffer buffer = ByteBuffer.allocate(size + 1);
+
+        serde.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
+        buffer.clear();
+        assertStartsWith("Found 1 byte(s) of garbage after",
+                assertThrows(MetadataParseException.class,
+                        () -> serde.read(new ByteBufferAccessor(buffer), size + 1)).getMessage());
+    }
+
+    private static void assertStartsWith(String prefix, String str) {
+        assertTrue(str.startsWith(prefix),
+                "Expected string '" + str + "' to start with '" + prefix + "'");
     }
 
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
index 67c067d..d292e39 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server.common.serialization;
 
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.Readable;
@@ -46,6 +45,19 @@ public abstract class AbstractApiMessageSerde implements RecordSerde<ApiMessageA
     private static final short DEFAULT_FRAME_VERSION = 0;
     private static final int DEFAULT_FRAME_VERSION_SIZE = ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
 
+    private static short unsignedIntToShort(Readable input, String entity) {
+        int val;
+        try {
+            val = input.readUnsignedVarint();
+        } catch (Exception e) {
+            throw new MetadataParseException("Error while reading " + entity, e);
+        }
+        if (val > Short.MAX_VALUE) {
+            throw new MetadataParseException("Value for " + entity + " was too large.");
+        }
+        return (short) val;
+    }
+
     @Override
     public int recordSize(ApiMessageAndVersion data,
                           ObjectSerializationCache serializationCache) {
@@ -69,16 +81,30 @@ public abstract class AbstractApiMessageSerde implements RecordSerde<ApiMessageA
     @Override
     public ApiMessageAndVersion read(Readable input,
                                      int size) {
-        short frameVersion = (short) input.readUnsignedVarint();
+        short frameVersion = unsignedIntToShort(input, "frame version");
+
         if (frameVersion != DEFAULT_FRAME_VERSION) {
-            throw new SerializationException("Could not deserialize metadata record due to unknown frame version "
-                                                     + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
+            throw new MetadataParseException("Could not deserialize metadata record due to unknown frame version "
+                    + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
         }
+        short apiKey = unsignedIntToShort(input, "type");
+        short version = unsignedIntToShort(input, "version");
 
-        short apiKey = (short) input.readUnsignedVarint();
-        short version = (short) input.readUnsignedVarint();
-        ApiMessage record = apiMessageFor(apiKey);
-        record.read(input, version);
+        ApiMessage record;
+        try {
+            record = apiMessageFor(apiKey);
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
+        try {
+            record.read(input, version);
+        } catch (Exception e) {
+            throw new MetadataParseException("Failed to deserialize record with type " + apiKey, e);
+        }
+        if (input.remaining() > 0) {
+            throw new MetadataParseException("Found " + input.remaining() +
+                    " byte(s) of garbage after " + apiKey);
+        }
         return new ApiMessageAndVersion(record, version);
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
similarity index 81%
rename from metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java
rename to server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
index 1c5d461..49eea2d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataParseException.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/MetadataParseException.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.kafka.metadata;
+package org.apache.kafka.server.common.serialization;
 
 /**
  * An exception indicating that we failed to parse a metadata entry.
@@ -26,4 +25,12 @@ public class MetadataParseException extends RuntimeException {
     public MetadataParseException(String message) {
         super(message);
     }
+
+    public MetadataParseException(Throwable e) {
+        super(e);
+    }
+
+    public MetadataParseException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }