You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:50 UTC

[6/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9f8e981..36094b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -20,7 +20,12 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 
-import static org.apache.kafka.common.protocol.types.Type.*;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class Protocol {
 
@@ -180,31 +185,31 @@ public class Protocol {
 
     public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
                                                                                STRING,
-                                                                               "The consumer group id."),
+                                                                               "The group id."),
                                                                      new Field("topics",
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
                                                                                "Topics to commit offsets."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
                                                                                STRING,
-                                                                               "The consumer group id."),
+                                                                               "The group id."),
                                                                      new Field("group_generation_id",
                                                                                INT32,
-                                                                               "The generation of the consumer group."),
-                                                                     new Field("consumer_id",
+                                                                               "The generation of the group."),
+                                                                     new Field("member_id",
                                                                                STRING,
-                                                                               "The consumer id assigned by the group coordinator."),
+                                                                               "The member id assigned by the group coordinator."),
                                                                      new Field("topics",
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
                                                                                "Topics to commit offsets."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
                                                                                STRING,
-                                                                               "The consumer group id."),
+                                                                               "The group id."),
                                                                      new Field("group_generation_id",
                                                                                INT32,
                                                                                "The generation of the consumer group."),
-                                                                     new Field("consumer_id",
+                                                                     new Field("member_id",
                                                                                STRING,
                                                                                "The consumer id assigned by the group coordinator."),
                                                                      new Field("retention_time",
@@ -384,17 +389,17 @@ public class Protocol {
     public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
 
     /* Consumer metadata api */
-    public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                   STRING,
-                                                                                   "The consumer group id."));
+    public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                                STRING,
+                                                                                "The unique group id."));
 
-    public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                          new Field("coordinator",
-                                                                                    BROKER,
-                                                                                    "Host and port information for the coordinator for a consumer group."));
+    public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                       new Field("coordinator",
+                                                                                 BROKER,
+                                                                                 "Host and port information for the coordinator for a consumer group."));
 
-    public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
-    public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
+    public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0};
+    public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -416,45 +421,67 @@ public class Protocol {
     public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
 
     /* Join group api */
+    public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
+                                                                           new Field("protocol_metadata", BYTES));
+
     public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
                                                                             STRING,
-                                                                            "The consumer group id."),
+                                                                            "The group id."),
                                                                   new Field("session_timeout",
                                                                             INT32,
                                                                             "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
-                                                                  new Field("topics",
-                                                                            new ArrayOf(STRING),
-                                                                            "An array of topics to subscribe to."),
-                                                                  new Field("consumer_id",
+                                                                  new Field("member_id",
                                                                             STRING,
                                                                             "The assigned consumer id or an empty string for a new consumer."),
-                                                                  new Field("partition_assignment_strategy",
+                                                                  new Field("protocol_type",
                                                                             STRING,
-                                                                            "The strategy for the coordinator to assign partitions."));
+                                                                            "Unique name for class of protocols implemented by group"),
+                                                                  new Field("group_protocols",
+                                                                            new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
+                                                                            "List of protocols that the member supports"));
+
 
-    public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                         new Field("partitions", new ArrayOf(INT32)));
+    public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
+                                                                          new Field("member_metadata", BYTES));
     public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                   new Field("group_generation_id",
+                                                                   new Field("generation_id",
                                                                              INT32,
                                                                              "The generation of the consumer group."),
-                                                                   new Field("consumer_id",
+                                                                   new Field("group_protocol",
+                                                                             STRING,
+                                                                             "The group protocol selected by the coordinator"),
+                                                                   new Field("leader_id",
+                                                                             STRING,
+                                                                             "The leader of the group"),
+                                                                   new Field("member_id",
                                                                              STRING,
                                                                              "The consumer id assigned by the group coordinator."),
-                                                                   new Field("assigned_partitions",
-                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+                                                                   new Field("members",
+                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
     public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
     public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
 
+    /* SyncGroup api */
+    public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
+                                                                         new Field("member_assignment", BYTES));
+    public static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING),
+                                                                  new Field("generation_id", INT32),
+                                                                  new Field("member_id", STRING),
+                                                                  new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+    public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                   new Field("member_assignment", BYTES));
+    public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0};
+    public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
+
     /* Heartbeat api */
     public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
                                                                  new Field("group_generation_id",
                                                                            INT32,
                                                                            "The generation of the consumer group."),
-                                                                 new Field("consumer_id",
+                                                                 new Field("member_id",
                                                                            STRING,
-                                                                           "The consumer id assigned by the group coordinator."));
+                                                                           "The member id assigned by the group coordinator."));
 
     public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
 
@@ -589,10 +616,11 @@ public class Protocol {
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+        REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
         REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
+        REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
 
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
@@ -605,10 +633,11 @@ public class Protocol {
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
         RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
+        RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 095cd52..03e77a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -49,14 +49,16 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return OffsetCommitRequest.parse(buffer, versionId);
             case OFFSET_FETCH:
                 return OffsetFetchRequest.parse(buffer, versionId);
-            case CONSUMER_METADATA:
-                return ConsumerMetadataRequest.parse(buffer, versionId);
+            case GROUP_METADATA:
+                return GroupMetadataRequest.parse(buffer, versionId);
             case JOIN_GROUP:
                 return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
                 return HeartbeatRequest.parse(buffer, versionId);
             case LEAVE_GROUP:
                 return LeaveGroupRequest.parse(buffer, versionId);
+            case SYNC_GROUP:
+                return SyncGroupRequest.parse(buffer, versionId);
             case STOP_REPLICA:
                 return StopReplicaRequest.parse(buffer, versionId);
             case CONTROLLED_SHUTDOWN_KEY:

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
deleted file mode 100644
index 5b3e04a..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    private final String groupId;
-
-    public ConsumerMetadataRequest(String groupId) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        this.groupId = groupId;
-    }
-
-    public ConsumerMetadataRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
-    }
-
-    public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
-        return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
deleted file mode 100644
index 0c250c3..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final short errorCode;
-    private final Node node;
-
-    public ConsumerMetadataResponse(short errorCode, Node node) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        this.errorCode = errorCode;
-        this.node = node;
-    }
-
-    public ConsumerMetadataResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
-        return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
new file mode 100644
index 0000000..fd54c5a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupMetadataRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+
+    private final String groupId;
+
+    public GroupMetadataRequest(String groupId) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        this.groupId = groupId;
+    }
+
+    public GroupMetadataRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer));
+    }
+
+    public static GroupMetadataRequest parse(ByteBuffer buffer) {
+        return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
new file mode 100644
index 0000000..a5ef478
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupMetadataResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private final Node node;
+
+    public GroupMetadataResponse(short errorCode, Node node) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        this.errorCode = errorCode;
+        this.node = node;
+    }
+
+    public GroupMetadataResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public static GroupMetadataResponse parse(ByteBuffer buffer) {
+        return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 89719f1..74be3ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -25,27 +25,27 @@ public class HeartbeatRequest extends AbstractRequest {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
 
     private final String groupId;
     private final int groupGenerationId;
-    private final String consumerId;
+    private final String memberId;
 
-    public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+    public HeartbeatRequest(String groupId, int groupGenerationId, String memberId) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
         this.groupId = groupId;
         this.groupGenerationId = groupGenerationId;
-        this.consumerId = consumerId;
+        this.memberId = memberId;
     }
 
     public HeartbeatRequest(Struct struct) {
         super(struct);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        memberId = struct.getString(MEMBER_ID_KEY_NAME);
     }
 
     @Override
@@ -67,8 +67,8 @@ public class HeartbeatRequest extends AbstractRequest {
         return groupGenerationId;
     }
 
-    public String consumerId() {
-        return consumerId;
+    public String memberId() {
+        return memberId;
     }
 
     public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 96e6ab0..48cb4c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -27,10 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
+     * UNKNOWN_MEMBER_ID (25)
      */
 
     private final short errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1ffe076..91a698c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -12,7 +12,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -29,42 +28,79 @@ public class JoinGroupRequest extends AbstractRequest {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+    private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
+    private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
+    private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata";
 
-    public static final String UNKNOWN_CONSUMER_ID = "";
+    public static final String UNKNOWN_MEMBER_ID = "";
 
     private final String groupId;
     private final int sessionTimeout;
-    private final List<String> topics;
-    private final String consumerId;
-    private final String strategy;
+    private final String memberId;
+    private final String protocolType;
+    private final List<GroupProtocol> groupProtocols;
 
-    public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+    public static class GroupProtocol {
+        private final String name;
+        private final ByteBuffer metadata;
+
+        public GroupProtocol(String name, ByteBuffer metadata) {
+            this.name = name;
+            this.metadata = metadata;
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public ByteBuffer metadata() {
+            return metadata;
+        }
+    }
+
+    public JoinGroupRequest(String groupId,
+                            int sessionTimeout,
+                            String memberId,
+                            String protocolType,
+                            List<GroupProtocol> groupProtocols) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        struct.set(STRATEGY_KEY_NAME, strategy);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
+
+        List<Struct> groupProtocolsList = new ArrayList<>();
+        for (GroupProtocol protocol : groupProtocols) {
+            Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
+            protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
+            protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
+            groupProtocolsList.add(protocolStruct);
+        }
+
+        struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
         this.groupId = groupId;
         this.sessionTimeout = sessionTimeout;
-        this.topics = topics;
-        this.consumerId = consumerId;
-        this.strategy = strategy;
+        this.memberId = memberId;
+        this.protocolType = protocolType;
+        this.groupProtocols = groupProtocols;
     }
 
     public JoinGroupRequest(Struct struct) {
         super(struct);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
-        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
-        for (Object topic: topicsArray)
-            topics.add((String) topic);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-        strategy = struct.getString(STRATEGY_KEY_NAME);
+        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
+
+        groupProtocols = new ArrayList<>();
+        for (Object groupProtocolObj : struct.getArray(GROUP_PROTOCOLS_KEY_NAME)) {
+            Struct groupProtocolStruct = (Struct) groupProtocolObj;
+            String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
+            ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
+            groupProtocols.add(new GroupProtocol(name, metadata));
+        }
     }
 
     @Override
@@ -74,8 +110,10 @@ public class JoinGroupRequest extends AbstractRequest {
                 return new JoinGroupResponse(
                         Errors.forException(e).code(),
                         JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
-                        Collections.<TopicPartition>emptyList());
+                        JoinGroupResponse.UNKNOWN_PROTOCOL,
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+                        Collections.<String, ByteBuffer>emptyMap());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
@@ -90,16 +128,16 @@ public class JoinGroupRequest extends AbstractRequest {
         return sessionTimeout;
     }
 
-    public List<String> topics() {
-        return topics;
+    public String memberId() {
+        return memberId;
     }
 
-    public String consumerId() {
-        return consumerId;
+    public List<GroupProtocol> groupProtocols() {
+        return groupProtocols;
     }
 
-    public String strategy() {
-        return strategy;
+    public String protocolType() {
+        return protocolType;
     }
 
     public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 7bf544e..c65a4bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -12,15 +12,16 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class JoinGroupResponse extends AbstractRequestResponse {
     
@@ -30,63 +31,78 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
-     * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
-     * UNKNOWN_CONSUMER_ID (25)
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
+     * INCONSISTENT_GROUP_PROTOCOL (23)
+     * UNKNOWN_MEMBER_ID (25)
      * INVALID_SESSION_TIMEOUT (26)
      */
 
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String GENERATION_ID_KEY_NAME = "generation_id";
+    private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
+    private static final String LEADER_ID_KEY_NAME = "leader_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String MEMBERS_KEY_NAME = "members";
 
+    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+
+    public static final String UNKNOWN_PROTOCOL = "";
     public static final int UNKNOWN_GENERATION_ID = -1;
-    public static final String UNKNOWN_CONSUMER_ID = "";
+    public static final String UNKNOWN_MEMBER_ID = "";
 
     private final short errorCode;
     private final int generationId;
-    private final String consumerId;
-    private final List<TopicPartition> assignedPartitions;
-
-    public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+    private final String groupProtocol;
+    private final String memberId;
+    private final String leaderId;
+    private final Map<String, ByteBuffer> members;
+
+    public JoinGroupResponse(short errorCode,
+                             int generationId,
+                             String groupProtocol,
+                             String memberId,
+                             String leaderId,
+                             Map<String, ByteBuffer> groupMembers) {
         super(new Struct(CURRENT_SCHEMA));
 
-        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
-
         struct.set(ERROR_CODE_KEY_NAME, errorCode);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
-            Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
-            topicArray.add(topicData);
+        struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(LEADER_ID_KEY_NAME, leaderId);
+
+        List<Struct> memberArray = new ArrayList<>();
+        for (Map.Entry<String, ByteBuffer> entries: groupMembers.entrySet()) {
+            Struct memberData = struct.instance(MEMBERS_KEY_NAME);
+            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
+            memberArray.add(memberData);
         }
-        struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+        struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
 
         this.errorCode = errorCode;
         this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.assignedPartitions = assignedPartitions;
+        this.groupProtocol = groupProtocol;
+        this.memberId = memberId;
+        this.leaderId = leaderId;
+        this.members = groupMembers;
     }
 
     public JoinGroupResponse(Struct struct) {
         super(struct);
-        assignedPartitions = new ArrayList<TopicPartition>();
-        for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
-                assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+        members = new HashMap<>();
+
+        for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
+            Struct memberData = (Struct) memberDataObj;
+            String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+            ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME);
+            members.put(memberId, memberMetadata);
         }
         errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
         generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
+        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
     public short errorCode() {
@@ -97,12 +113,24 @@ public class JoinGroupResponse extends AbstractRequestResponse {
         return generationId;
     }
 
-    public String consumerId() {
-        return consumerId;
+    public String groupProtocol() {
+        return groupProtocol;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public String leaderId() {
+        return leaderId;
+    }
+
+    public boolean isLeader() {
+        return memberId.equals(leaderId);
     }
 
-    public List<TopicPartition> assignedPartitions() {
-        return assignedPartitions;
+    public Map<String, ByteBuffer> members() {
+        return members;
     }
 
     public static JoinGroupResponse parse(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 03df1e7..8721efa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -34,7 +34,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String RETENTION_TIME_KEY_NAME = "retention_time";
 
@@ -52,7 +52,7 @@ public class OffsetCommitRequest extends AbstractRequest {
 
     // default values for the current version
     public static final int DEFAULT_GENERATION_ID = -1;
-    public static final String DEFAULT_CONSUMER_ID = "";
+    public static final String DEFAULT_MEMBER_ID = "";
     public static final long DEFAULT_RETENTION_TIME = -1L;
 
     // default values for old versions,
@@ -61,7 +61,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
 
     private final String groupId;
-    private final String consumerId;
+    private final String memberId;
     private final int generationId;
     private final long retentionTime;
     private final Map<TopicPartition, PartitionData> offsetData;
@@ -97,7 +97,7 @@ public class OffsetCommitRequest extends AbstractRequest {
         initCommonFields(groupId, offsetData);
         this.groupId = groupId;
         this.generationId = DEFAULT_GENERATION_ID;
-        this.consumerId = DEFAULT_CONSUMER_ID;
+        this.memberId = DEFAULT_MEMBER_ID;
         this.retentionTime = DEFAULT_RETENTION_TIME;
         this.offsetData = offsetData;
     }
@@ -106,19 +106,19 @@ public class OffsetCommitRequest extends AbstractRequest {
      * Constructor for version 1.
      * @param groupId
      * @param generationId
-     * @param consumerId
+     * @param memberId
      * @param offsetData
      */
     @Deprecated
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+    public OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) {
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
 
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
         this.groupId = groupId;
         this.generationId = generationId;
-        this.consumerId = consumerId;
+        this.memberId = memberId;
         this.retentionTime = DEFAULT_RETENTION_TIME;
         this.offsetData = offsetData;
     }
@@ -127,20 +127,20 @@ public class OffsetCommitRequest extends AbstractRequest {
      * Constructor for version 2.
      * @param groupId
      * @param generationId
-     * @param consumerId
+     * @param memberId
      * @param retentionTime
      * @param offsetData
      */
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
+    public OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
         super(new Struct(CURRENT_SCHEMA));
 
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
         struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
         this.groupId = groupId;
         this.generationId = generationId;
-        this.consumerId = consumerId;
+        this.memberId = memberId;
         this.retentionTime = retentionTime;
         this.offsetData = offsetData;
     }
@@ -183,10 +183,10 @@ public class OffsetCommitRequest extends AbstractRequest {
             generationId = DEFAULT_GENERATION_ID;
 
         // This field only exists in v1.
-        if (struct.hasField(CONSUMER_ID_KEY_NAME))
-            consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        if (struct.hasField(MEMBER_ID_KEY_NAME))
+            memberId = struct.getString(MEMBER_ID_KEY_NAME);
         else
-            consumerId = DEFAULT_CONSUMER_ID;
+            memberId = DEFAULT_MEMBER_ID;
 
         // This field only exists in v2
         if (struct.hasField(RETENTION_TIME_KEY_NAME))
@@ -243,8 +243,8 @@ public class OffsetCommitRequest extends AbstractRequest {
         return generationId;
     }
 
-    public String consumerId() {
-        return consumerId;
+    public String memberId() {
+        return memberId;
     }
 
     public long retentionTime() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index a163333..dae9c37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -42,10 +42,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
      * Possible error code:
      *
      * OFFSET_METADATA_TOO_LARGE (12)
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
+     * UNKNOWN_MEMBER_ID (25)
      * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
      * INVALID_COMMIT_OFFSET_SIZE (28)
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 3dc8521..09ac74a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -49,9 +49,9 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
      *
      *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
      *  OFFSET_LOAD_IN_PROGRESS (14)
-     *  NOT_COORDINATOR_FOR_CONSUMER (16)
+     *  NOT_COORDINATOR_FOR_GROUP (16)
      *  ILLEGAL_GENERATION (22)
-     *  UNKNOWN_CONSUMER_ID (25)
+     *  UNKNOWN_MEMBER_ID (25)
      */
 
     private final Map<TopicPartition, PartitionData> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
new file mode 100644
index 0000000..606584b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SyncGroupRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SYNC_GROUP.id);
+    public static final String GROUP_ID_KEY_NAME = "group_id";
+    public static final String GENERATION_ID_KEY_NAME = "generation_id";
+    public static final String MEMBER_ID_KEY_NAME = "member_id";
+    public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+    public static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
+
+    private final String groupId;
+    private final int generationId;
+    private final String memberId;
+    private final Map<String, ByteBuffer> groupAssignment;
+
+    public SyncGroupRequest(String groupId,
+                            int generationId,
+                            String memberId,
+                            Map<String, ByteBuffer> groupAssignment) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+
+        List<Struct> memberArray = new ArrayList<>();
+        for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) {
+            Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
+            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
+            memberArray.add(memberData);
+        }
+        struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
+
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.memberId = memberId;
+        this.groupAssignment = groupAssignment;
+    }
+
+    public SyncGroupRequest(Struct struct) {
+        super(struct);
+        this.groupId = struct.getString(GROUP_ID_KEY_NAME);
+        this.generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        this.memberId = struct.getString(MEMBER_ID_KEY_NAME);
+
+        groupAssignment = new HashMap<>();
+
+        for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) {
+            Struct memberData = (Struct) memberDataObj;
+            String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+            ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+            groupAssignment.put(memberId, memberMetadata);
+        }
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new SyncGroupResponse(
+                        Errors.forException(e).code(),
+                        ByteBuffer.wrap(new byte[]{}));
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public Map<String, ByteBuffer> groupAssignment() {
+        return groupAssignment;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) {
+        return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
new file mode 100644
index 0000000..a96b7e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class SyncGroupResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id);
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+    public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+    /**
+     * Possible error codes:
+     *
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
+     * ILLEGAL_GENERATION (22)
+     * UNKNOWN_MEMBER_ID (25)
+     * REBALANCE_IN_PROGRESS (30)
+     *
+     */
+
+    private final short errorCode;
+    private final ByteBuffer memberState;
+
+    public SyncGroupResponse(short errorCode, ByteBuffer memberState) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
+
+        this.errorCode = errorCode;
+        this.memberState = memberState;
+    }
+
+    public SyncGroupResponse(Struct struct) {
+        super(struct);
+
+        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public ByteBuffer memberAssignment() {
+        return memberState;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a7a2968..bc0e645 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -23,6 +23,7 @@ import java.io.StringWriter;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +53,18 @@ public class Utils {
     private static final Logger log = LoggerFactory.getLogger(Utils.class);
 
     /**
+     * Get a sorted list representation of a collection.
+     * @param collection The collection to sort
+     * @param <T> The class of objects in the collection
+     * @return An unmodifiable sorted list with the contents of the collection
+     */
+    public static <T extends Comparable<? super T>> List<T> sorted(Collection<T> collection) {
+        List<T> res = new ArrayList<>(collection);
+        Collections.sort(res);
+        return Collections.unmodifiableList(res);
+    }
+
+    /**
      * Turn the given UTF8 byte array into a string
      *
      * @param bytes The byte array
@@ -114,6 +127,21 @@ public class Utils {
     }
 
     /**
+     * Get the little-endian value of an integer as a byte array.
+     * @param val The value to convert to a litte-endian array
+     * @return The little-endian encoded array of bytes for the value
+     */
+    public static byte[] toArrayLE(int val) {
+        return new byte[] {
+            (byte) (val >> 8 * 0),
+            (byte) (val >> 8 * 1),
+            (byte) (val >> 8 * 2),
+            (byte) (val >> 8 * 3)
+        };
+    }
+
+
+    /**
      * Read an unsigned integer stored in little-endian format from a byte array
      * at a given offset.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index b7160a1..55d7608 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -195,7 +195,6 @@ public class MetadataTest {
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
     }
 
-
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
new file mode 100644
index 0000000..13cce13
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RangeAssignorTest {
+
+    private RangeAssignor assignor = new RangeAssignor();
+
+
+    @Test
+    public void testOneConsumerNoTopic() {
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Collections.<String>emptyList()));
+
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+    }
+
+    @Test
+    public void testOneConsumerNonexistentTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 0);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+    }
+
+    @Test
+    public void testOneConsumerOneTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic, 0),
+                new TopicPartition(topic, 1),
+                new TopicPartition(topic, 2)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testOnlyAssignsPartitionsFromSubscribedTopics() {
+        String topic = "topic";
+        String otherTopic = "other";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(otherTopic, 3);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic, 0),
+                new TopicPartition(topic, 1),
+                new TopicPartition(topic, 2)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testOneConsumerMultipleTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 1);
+        partitionsPerTopic.put(topic2, 2);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2)));
+
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 0),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 1)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testTwoConsumersOneTopicOnePartition() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 1);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic));
+        consumers.put(consumer2, Arrays.asList(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+        assertAssignment(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+    }
+
+
+    @Test
+    public void testTwoConsumersOneTopicTwoPartitions() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic));
+        consumers.put(consumer2, Arrays.asList(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+        assertAssignment(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2));
+    }
+
+    @Test
+    public void testMultipleConsumersMixedTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 2);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic1));
+        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+        consumers.put(consumer3, Arrays.asList(topic1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 0)), assignment.get(consumer1));
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 1),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 1)), assignment.get(consumer2));
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 2)), assignment.get(consumer3));
+    }
+
+    @Test
+    public void testTwoConsumersTwoTopicsSixPartitions() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic1, topic2));
+        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 0),
+                new TopicPartition(topic1, 1),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 1)), assignment.get(consumer1));
+        assertAssignment(Arrays.asList(
+                new TopicPartition(topic1, 2),
+                new TopicPartition(topic2, 2)), assignment.get(consumer2));
+    }
+
+    private void assertAssignment(List<TopicPartition> expected, List<TopicPartition> actual) {
+        // order doesn't matter for assignment, so convert to a set
+        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
new file mode 100644
index 0000000..31598cd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RoundRobinAssignorTest {
+
+    private RoundRobinAssignor assignor = new RoundRobinAssignor();
+
+
+    @Test
+    public void testOneConsumerNoTopic() {
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Collections.<String>emptyList()));
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+    }
+
+    @Test
+    public void testOneConsumerNonexistentTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 0);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+    }
+
+    @Test
+    public void testOneConsumerOneTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic, 0),
+                new TopicPartition(topic, 1),
+                new TopicPartition(topic, 2)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testOnlyAssignsPartitionsFromSubscribedTopics() {
+        String topic = "topic";
+        String otherTopic = "other";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(otherTopic, 3);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic, 0),
+                new TopicPartition(topic, 1),
+                new TopicPartition(topic, 2)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testOneConsumerMultipleTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 1);
+        partitionsPerTopic.put(topic2, 2);
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+                Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2)));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 0),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 1)), assignment.get(consumerId));
+    }
+
+    @Test
+    public void testTwoConsumersOneTopicOnePartition() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 1);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic));
+        consumers.put(consumer2, Arrays.asList(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+        assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+    }
+
+    @Test
+    public void testTwoConsumersOneTopicTwoPartitions() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic));
+        consumers.put(consumer2, Arrays.asList(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2));
+    }
+
+    @Test
+    public void testMultipleConsumersMixedTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 2);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic1));
+        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+        consumers.put(consumer3, Arrays.asList(topic1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 0)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 1),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 1)), assignment.get(consumer2));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 2)), assignment.get(consumer3));
+    }
+
+    @Test
+    public void testTwoConsumersTwoTopicsSixPartitions() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        Map<String, List<String>> consumers = new HashMap<>();
+        consumers.put(consumer1, Arrays.asList(topic1, topic2));
+        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 0),
+                new TopicPartition(topic1, 2),
+                new TopicPartition(topic2, 1)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(
+                new TopicPartition(topic1, 1),
+                new TopicPartition(topic2, 0),
+                new TopicPartition(topic2, 2)), assignment.get(consumer2));
+    }
+
+    public static List<String> topics(String... topics) {
+        return Arrays.asList(topics);
+    }
+
+    public static TopicPartition tp(String topic, int partition) {
+        return new TopicPartition(topic, partition);
+    }
+
+}