You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2015/12/18 19:27:19 UTC

kafka git commit: MINOR: Change return type of `Schema.read` to be `Struct` instead of `Object`

Repository: kafka
Updated Branches:
  refs/heads/trunk 9220df9f8 -> 8c754c45a


MINOR: Change return type of `Schema.read` to be `Struct` instead of `Object`

We always return a `Struct` from `Schema.read` and this means that
we can remove a large number of casts.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #684 from ijuma/schema-read-should-return-struct


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c754c45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c754c45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c754c45

Branch: refs/heads/trunk
Commit: 8c754c45af57a3249232268c6093cb4f50e1d45f
Parents: 9220df9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Dec 18 10:27:03 2015 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Dec 18 10:27:03 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/clients/NetworkClient.java    | 2 +-
 .../kafka/clients/consumer/internals/ConsumerProtocol.java   | 8 ++++----
 .../java/org/apache/kafka/common/protocol/ProtoUtils.java    | 6 +++---
 .../java/org/apache/kafka/common/protocol/types/Schema.java  | 8 ++++++--
 .../kafka/common/requests/ControlledShutdownRequest.java     | 2 +-
 .../kafka/common/requests/ControlledShutdownResponse.java    | 2 +-
 .../apache/kafka/common/requests/DescribeGroupsRequest.java  | 2 +-
 .../apache/kafka/common/requests/DescribeGroupsResponse.java | 2 +-
 .../java/org/apache/kafka/common/requests/FetchRequest.java  | 2 +-
 .../java/org/apache/kafka/common/requests/FetchResponse.java | 4 ++--
 .../kafka/common/requests/GroupCoordinatorRequest.java       | 2 +-
 .../kafka/common/requests/GroupCoordinatorResponse.java      | 4 ++--
 .../org/apache/kafka/common/requests/HeartbeatRequest.java   | 4 ++--
 .../org/apache/kafka/common/requests/HeartbeatResponse.java  | 4 ++--
 .../org/apache/kafka/common/requests/JoinGroupRequest.java   | 2 +-
 .../org/apache/kafka/common/requests/JoinGroupResponse.java  | 4 ++--
 .../apache/kafka/common/requests/LeaderAndIsrRequest.java    | 2 +-
 .../apache/kafka/common/requests/LeaderAndIsrResponse.java   | 2 +-
 .../org/apache/kafka/common/requests/LeaveGroupRequest.java  | 2 +-
 .../org/apache/kafka/common/requests/LeaveGroupResponse.java | 4 ++--
 .../org/apache/kafka/common/requests/ListGroupsRequest.java  | 2 +-
 .../org/apache/kafka/common/requests/ListGroupsResponse.java | 2 +-
 .../org/apache/kafka/common/requests/ListOffsetRequest.java  | 2 +-
 .../org/apache/kafka/common/requests/ListOffsetResponse.java | 2 +-
 .../org/apache/kafka/common/requests/MetadataRequest.java    | 2 +-
 .../org/apache/kafka/common/requests/MetadataResponse.java   | 4 ++--
 .../apache/kafka/common/requests/OffsetCommitRequest.java    | 4 ++--
 .../apache/kafka/common/requests/OffsetCommitResponse.java   | 2 +-
 .../org/apache/kafka/common/requests/OffsetFetchRequest.java | 2 +-
 .../apache/kafka/common/requests/OffsetFetchResponse.java    | 2 +-
 .../org/apache/kafka/common/requests/ProduceRequest.java     | 2 +-
 .../org/apache/kafka/common/requests/ProduceResponse.java    | 2 +-
 .../java/org/apache/kafka/common/requests/RequestHeader.java | 2 +-
 .../org/apache/kafka/common/requests/ResponseHeader.java     | 2 +-
 .../org/apache/kafka/common/requests/StopReplicaRequest.java | 2 +-
 .../apache/kafka/common/requests/StopReplicaResponse.java    | 2 +-
 .../org/apache/kafka/common/requests/SyncGroupResponse.java  | 2 +-
 .../apache/kafka/common/requests/UpdateMetadataRequest.java  | 2 +-
 .../apache/kafka/common/requests/UpdateMetadataResponse.java | 2 +-
 .../kafka/connect/runtime/distributed/ConnectProtocol.java   | 8 ++++----
 .../main/scala/kafka/coordinator/GroupMetadataManager.scala  | 6 +++---
 41 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 52db61a..232a3cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -461,7 +461,7 @@ public class NetworkClient implements KafkaClient {
             // Always expect the response version id to be the same as the request version id
             short apiKey = req.request().header().apiKey();
             short apiVer = req.request().header().apiVersion();
-            Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
+            Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
             correlate(req.request().header(), header);
             if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                 responses.add(new ClientResponse(req, now, false, body));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 4728a50..3f87995 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -91,10 +91,10 @@ public class ConsumerProtocol {
     }
 
     public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
-        Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
         checkVersionCompatibility(version);
-        Struct struct = (Struct) SUBSCRIPTION_V0.read(buffer);
+        Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
         for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
@@ -103,10 +103,10 @@ public class ConsumerProtocol {
     }
 
     public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) {
-        Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
         checkVersionCompatibility(version);
-        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        Struct struct = ASSIGNMENT_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<TopicPartition> partitions = new ArrayList<>();
         for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index 85357ab..9f38737 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -57,15 +57,15 @@ public class ProtoUtils {
     }
 
     public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
-        return (Struct) requestSchema(apiKey, version).read(buffer);
+        return requestSchema(apiKey, version).read(buffer);
     }
 
     public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
-        return (Struct) currentResponseSchema(apiKey).read(buffer);
+        return currentResponseSchema(apiKey).read(buffer);
     }
 
     public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) {
-        return (Struct) responseSchema(apiKey, version).read(buffer);
+        return responseSchema(apiKey, version).read(buffer);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 3a14ac0..e8dce31 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -44,6 +44,7 @@ public class Schema extends Type {
     /**
      * Write a struct to the buffer
      */
+    @Override
     public void write(ByteBuffer buffer, Object o) {
         Struct r = (Struct) o;
         for (int i = 0; i < fields.length; i++) {
@@ -62,7 +63,8 @@ public class Schema extends Type {
     /**
      * Read a struct from the buffer
      */
-    public Object read(ByteBuffer buffer) {
+    @Override
+    public Struct read(ByteBuffer buffer) {
         Object[] objects = new Object[fields.length];
         for (int i = 0; i < fields.length; i++) {
             try {
@@ -79,6 +81,7 @@ public class Schema extends Type {
     /**
      * The size of the given record
      */
+    @Override
     public int sizeOf(Object o) {
         int size = 0;
         Struct r = (Struct) o;
@@ -124,6 +127,7 @@ public class Schema extends Type {
     /**
      * Display a string representation of the schema
      */
+    @Override
     public String toString() {
         StringBuilder b = new StringBuilder();
         b.append('{');
@@ -156,4 +160,4 @@ public class Schema extends Type {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 57f51d8..9ac127d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -64,6 +64,6 @@ public class ControlledShutdownRequest extends AbstractRequest {
     }
 
     public static ControlledShutdownRequest parse(ByteBuffer buffer) {
-        return new ControlledShutdownRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ControlledShutdownRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 1f4af7c..add3fa6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -82,7 +82,7 @@ public class ControlledShutdownResponse extends AbstractRequestResponse {
     }
 
     public static ControlledShutdownResponse parse(ByteBuffer buffer) {
-        return new ControlledShutdownResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ControlledShutdownResponse(CURRENT_SCHEMA.read(buffer));
     }
 
     public static ControlledShutdownResponse parse(ByteBuffer buffer, int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index a545cca..a870b8f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -62,7 +62,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     public static DescribeGroupsRequest parse(ByteBuffer buffer) {
-        return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new DescribeGroupsRequest(CURRENT_SCHEMA.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index c71e7d2..2d4faee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -210,7 +210,7 @@ public class DescribeGroupsResponse extends AbstractRequestResponse {
     }
 
     public static DescribeGroupsResponse parse(ByteBuffer buffer) {
-        return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new DescribeGroupsResponse(CURRENT_SCHEMA.read(buffer));
     }
 
     public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index feb4109..a7d8349 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -160,6 +160,6 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public static FetchRequest parse(ByteBuffer buffer) {
-        return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new FetchRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 7b78415..f28472f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -156,10 +156,10 @@ public class FetchResponse extends AbstractRequestResponse {
     }
 
     public static FetchResponse parse(ByteBuffer buffer) {
-        return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new FetchResponse(CURRENT_SCHEMA.read(buffer));
     }
 
     public static FetchResponse parse(ByteBuffer buffer, int version) {
-        return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
+        return new FetchResponse(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index 8c56e7f..0b98e55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -60,6 +60,6 @@ public class GroupCoordinatorRequest extends AbstractRequest {
     }
 
     public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
-        return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new GroupCoordinatorRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index 3fe014d..8e7beb4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -74,6 +74,6 @@ public class GroupCoordinatorResponse extends AbstractRequestResponse {
     }
 
     public static GroupCoordinatorResponse parse(ByteBuffer buffer) {
-        return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new GroupCoordinatorResponse(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 74be3ed..02eaa99 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -76,6 +76,6 @@ public class HeartbeatRequest extends AbstractRequest {
     }
 
     public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new HeartbeatRequest(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index a595efb..7fe227c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -52,6 +52,6 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     }
 
     public static HeartbeatResponse parse(ByteBuffer buffer) {
-        return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new HeartbeatResponse(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index cae07bc..14a6c1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -145,6 +145,6 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new JoinGroupRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index c869b1e..dd829ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -136,6 +136,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     }
 
     public static JoinGroupResponse parse(ByteBuffer buffer) {
-        return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new JoinGroupResponse(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 002beef..9932fbb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -206,7 +206,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     public static LeaderAndIsrRequest parse(ByteBuffer buffer) {
-        return new LeaderAndIsrRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new LeaderAndIsrRequest(CURRENT_SCHEMA.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 3a6f4ee..df57714 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -99,7 +99,7 @@ public class LeaderAndIsrResponse extends AbstractRequestResponse {
     }
 
     public static LeaderAndIsrResponse parse(ByteBuffer buffer) {
-        return new LeaderAndIsrResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new LeaderAndIsrResponse(CURRENT_SCHEMA.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 05bdf90..3047193 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -66,6 +66,6 @@ public class LeaveGroupRequest extends AbstractRequest {
     }
 
     public static LeaveGroupRequest parse(ByteBuffer buffer) {
-        return new LeaveGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new LeaveGroupRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index e0ca117..6481ca7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -50,6 +50,6 @@ public class LeaveGroupResponse extends AbstractRequestResponse {
     }
 
     public static LeaveGroupResponse parse(ByteBuffer buffer) {
-        return new LeaveGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new LeaveGroupResponse(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 439720f..3160702 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -50,7 +50,7 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     public static ListGroupsRequest parse(ByteBuffer buffer) {
-        return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ListGroupsRequest(CURRENT_SCHEMA.read(buffer));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index d07f0d1..5519670 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -97,7 +97,7 @@ public class ListGroupsResponse extends AbstractRequestResponse {
     }
 
     public static ListGroupsResponse parse(ByteBuffer buffer) {
-        return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ListGroupsResponse(CURRENT_SCHEMA.read(buffer));
     }
 
     public static ListGroupsResponse fromError(Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 2a91637..a3777e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -143,6 +143,6 @@ public class ListOffsetRequest extends AbstractRequest {
     }
 
     public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ListOffsetRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index f706086..5befe14 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -113,6 +113,6 @@ public class ListOffsetResponse extends AbstractRequestResponse {
     }
 
     public static ListOffsetResponse parse(ByteBuffer buffer) {
-        return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ListOffsetResponse(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index cc1771b..a6c249f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -77,6 +77,6 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     public static MetadataRequest parse(ByteBuffer buffer) {
-        return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new MetadataRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 170e4b8..805b9e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -201,6 +201,6 @@ public class MetadataResponse extends AbstractRequestResponse {
     }
 
     public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new MetadataResponse(CURRENT_SCHEMA.read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8721efa..df18486 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -257,10 +257,10 @@ public class OffsetCommitRequest extends AbstractRequest {
 
     public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
         Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
-        return new OffsetCommitRequest((Struct) schema.read(buffer));
+        return new OffsetCommitRequest(schema.read(buffer));
     }
 
     public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new OffsetCommitRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 4d10a91..9b53fb4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -98,6 +98,6 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     }
 
     public static OffsetCommitResponse parse(ByteBuffer buffer) {
-        return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new OffsetCommitResponse(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 6ee7597..422328e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -118,6 +118,6 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     public static OffsetFetchRequest parse(ByteBuffer buffer) {
-        return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new OffsetFetchRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 88d68ea..a76f48e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -123,6 +123,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     }
 
     public static OffsetFetchResponse parse(ByteBuffer buffer) {
-        return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new OffsetFetchResponse(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 5663f2c..0581f84 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -127,6 +127,6 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     public static ProduceRequest parse(ByteBuffer buffer) {
-        return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ProduceRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 2868550..fc41307 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -149,6 +149,6 @@ public class ProduceResponse extends AbstractRequestResponse {
     }
 
     public static ProduceResponse parse(ByteBuffer buffer) {
-        return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new ProduceResponse(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 14bcde7..f7b75b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -77,6 +77,6 @@ public class RequestHeader extends AbstractRequestResponse {
     }
 
     public static RequestHeader parse(ByteBuffer buffer) {
-        return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
+        return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index e8a7ef9..e68bd39 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -50,7 +50,7 @@ public class ResponseHeader extends AbstractRequestResponse {
     }
 
     public static ResponseHeader parse(ByteBuffer buffer) {
-        return new ResponseHeader((Struct) Protocol.RESPONSE_HEADER.read(buffer));
+        return new ResponseHeader(Protocol.RESPONSE_HEADER.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 85ac394..11c2b75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -115,6 +115,6 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     public static StopReplicaRequest parse(ByteBuffer buffer) {
-        return new StopReplicaRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new StopReplicaRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 4fa1cac..a8e736c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -96,6 +96,6 @@ public class StopReplicaResponse extends AbstractRequestResponse {
     }
 
     public static StopReplicaResponse parse(ByteBuffer buffer) {
-        return new StopReplicaResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new StopReplicaResponse(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 7256bd2..3584f14 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -69,7 +69,7 @@ public class SyncGroupResponse extends AbstractRequestResponse {
     }
 
     public static SyncGroupResponse parse(ByteBuffer buffer) {
-        return new SyncGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new SyncGroupResponse(CURRENT_SCHEMA.read(buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index f9f76be..4c65ce9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -286,6 +286,6 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     public static UpdateMetadataRequest parse(ByteBuffer buffer) {
-        return new UpdateMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+        return new UpdateMetadataRequest(CURRENT_SCHEMA.read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 5bec437..0ff35d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -49,7 +49,7 @@ public class UpdateMetadataResponse extends AbstractRequestResponse {
     }
 
     public static UpdateMetadataResponse parse(ByteBuffer buffer) {
-        return new UpdateMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+        return new UpdateMetadataResponse(CURRENT_SCHEMA.read(buffer));
     }
 
     public static UpdateMetadataResponse parse(ByteBuffer buffer, int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
index 971873f..da5286c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
@@ -83,10 +83,10 @@ public class ConnectProtocol {
     }
 
     public static WorkerState deserializeMetadata(ByteBuffer buffer) {
-        Struct header = (Struct) CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
         checkVersionCompatibility(version);
-        Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
+        Struct struct = CONFIG_STATE_V0.read(buffer);
         long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
         String url = struct.getString(URL_KEY_NAME);
         return new WorkerState(url, configOffset);
@@ -116,10 +116,10 @@ public class ConnectProtocol {
     }
 
     public static Assignment deserializeAssignment(ByteBuffer buffer) {
-        Struct header = (Struct) CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
         checkVersionCompatibility(version);
-        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        Struct struct = ASSIGNMENT_V0.read(buffer);
         short error = struct.getShort(ERROR_KEY_NAME);
         String leader = struct.getString(LEADER_KEY_NAME);
         String leaderUrl = struct.getString(LEADER_URL_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c754c45/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 71d2338..e502d38 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -850,7 +850,7 @@ object GroupMetadataManager {
   def readMessageKey(buffer: ByteBuffer): BaseKey = {
     val version = buffer.getShort
     val keySchema = schemaForKey(version)
-    val key = keySchema.read(buffer).asInstanceOf[Struct]
+    val key = keySchema.read(buffer)
 
     if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
       // version 0 and 1 refer to offset
@@ -882,7 +882,7 @@ object GroupMetadataManager {
     } else {
       val version = buffer.getShort
       val valueSchema = schemaForOffset(version)
-      val value = valueSchema.read(buffer).asInstanceOf[Struct]
+      val value = valueSchema.read(buffer)
 
       if (version == 0) {
         val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
@@ -915,7 +915,7 @@ object GroupMetadataManager {
     } else {
       val version = buffer.getShort
       val valueSchema = schemaForGroup(version)
-      val value = valueSchema.read(buffer).asInstanceOf[Struct]
+      val value = valueSchema.read(buffer)
 
       if (version == 0) {
         val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]