You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:50 UTC
[6/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9f8e981..36094b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -20,7 +20,12 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
-import static org.apache.kafka.common.protocol.types.Type.*;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
public class Protocol {
@@ -180,31 +185,31 @@ public class Protocol {
public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
STRING,
- "The consumer group id."),
+ "The group id."),
new Field("topics",
new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
"Topics to commit offsets."));
public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
STRING,
- "The consumer group id."),
+ "The group id."),
new Field("group_generation_id",
INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
+ "The generation of the group."),
+ new Field("member_id",
STRING,
- "The consumer id assigned by the group coordinator."),
+ "The member id assigned by the group coordinator."),
new Field("topics",
new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
"Topics to commit offsets."));
public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
STRING,
- "The consumer group id."),
+ "The group id."),
new Field("group_generation_id",
INT32,
"The generation of the consumer group."),
- new Field("consumer_id",
+ new Field("member_id",
STRING,
"The consumer id assigned by the group coordinator."),
new Field("retention_time",
@@ -384,17 +389,17 @@ public class Protocol {
public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
/* Consumer metadata api */
- public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."));
+ public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The unique group id."));
- public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("coordinator",
- BROKER,
- "Host and port information for the coordinator for a consumer group."));
+ public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
- public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
- public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
+ public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0};
+ public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0};
/* Controlled shutdown api */
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -416,45 +421,67 @@ public class Protocol {
public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
/* Join group api */
+ public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
+ new Field("protocol_metadata", BYTES));
+
public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
STRING,
- "The consumer group id."),
+ "The group id."),
new Field("session_timeout",
INT32,
"The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to subscribe to."),
- new Field("consumer_id",
+ new Field("member_id",
STRING,
"The assigned consumer id or an empty string for a new consumer."),
- new Field("partition_assignment_strategy",
+ new Field("protocol_type",
STRING,
- "The strategy for the coordinator to assign partitions."));
+ "Unique name for class of protocols implemented by group"),
+ new Field("group_protocols",
+ new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
+ "List of protocols that the member supports"));
+
- public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partitions", new ArrayOf(INT32)));
+ public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
+ new Field("member_metadata", BYTES));
public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("group_generation_id",
+ new Field("generation_id",
INT32,
"The generation of the consumer group."),
- new Field("consumer_id",
+ new Field("group_protocol",
+ STRING,
+ "The group protocol selected by the coordinator"),
+ new Field("leader_id",
+ STRING,
+ "The leader of the group"),
+ new Field("member_id",
STRING,
"The consumer id assigned by the group coordinator."),
- new Field("assigned_partitions",
- new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+ new Field("members",
+ new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
+ /* SyncGroup api */
+ public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
+ new Field("member_assignment", BYTES));
+ public static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING),
+ new Field("generation_id", INT32),
+ new Field("member_id", STRING),
+ new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
+ public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("member_assignment", BYTES));
+ public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0};
+ public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
+
/* Heartbeat api */
public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
new Field("group_generation_id",
INT32,
"The generation of the consumer group."),
- new Field("consumer_id",
+ new Field("member_id",
STRING,
- "The consumer id assigned by the group coordinator."));
+ "The member id assigned by the group coordinator."));
public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
@@ -589,10 +616,11 @@ public class Protocol {
REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
- REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+ REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST;
REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
+ REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
@@ -605,10 +633,11 @@ public class Protocol {
RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
- RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+ RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE;
RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
+ RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 095cd52..03e77a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -49,14 +49,16 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
- case CONSUMER_METADATA:
- return ConsumerMetadataRequest.parse(buffer, versionId);
+ case GROUP_METADATA:
+ return GroupMetadataRequest.parse(buffer, versionId);
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
case LEAVE_GROUP:
return LeaveGroupRequest.parse(buffer, versionId);
+ case SYNC_GROUP:
+ return SyncGroupRequest.parse(buffer, versionId);
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
case CONTROLLED_SHUTDOWN_KEY:
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
deleted file mode 100644
index 5b3e04a..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
-
- private final String groupId;
-
- public ConsumerMetadataRequest(String groupId) {
- super(new Struct(CURRENT_SCHEMA));
-
- struct.set(GROUP_ID_KEY_NAME, groupId);
- this.groupId = groupId;
- }
-
- public ConsumerMetadataRequest(Struct struct) {
- super(struct);
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
- return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
- }
-
- public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
- return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
deleted file mode 100644
index 0c250c3..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
- private static final String ERROR_CODE_KEY_NAME = "error_code";
- private static final String COORDINATOR_KEY_NAME = "coordinator";
-
- // coordinator level field names
- private static final String NODE_ID_KEY_NAME = "node_id";
- private static final String HOST_KEY_NAME = "host";
- private static final String PORT_KEY_NAME = "port";
-
- private final short errorCode;
- private final Node node;
-
- public ConsumerMetadataResponse(short errorCode, Node node) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(ERROR_CODE_KEY_NAME, errorCode);
- Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
- coordinator.set(NODE_ID_KEY_NAME, node.id());
- coordinator.set(HOST_KEY_NAME, node.host());
- coordinator.set(PORT_KEY_NAME, node.port());
- struct.set(COORDINATOR_KEY_NAME, coordinator);
- this.errorCode = errorCode;
- this.node = node;
- }
-
- public ConsumerMetadataResponse(Struct struct) {
- super(struct);
- errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
- Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
- int nodeId = broker.getInt(NODE_ID_KEY_NAME);
- String host = broker.getString(HOST_KEY_NAME);
- int port = broker.getInt(PORT_KEY_NAME);
- node = new Node(nodeId, host, port);
- }
-
- public short errorCode() {
- return errorCode;
- }
-
- public Node node() {
- return node;
- }
-
- public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
- return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
new file mode 100644
index 0000000..fd54c5a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupMetadataRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+
+ private final String groupId;
+
+ public GroupMetadataRequest(String groupId) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ this.groupId = groupId;
+ }
+
+ public GroupMetadataRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) {
+ return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer));
+ }
+
+ public static GroupMetadataRequest parse(ByteBuffer buffer) {
+ return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
new file mode 100644
index 0000000..a5ef478
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupMetadataResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+ // coordinator level field names
+ private static final String NODE_ID_KEY_NAME = "node_id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
+
+ private final short errorCode;
+ private final Node node;
+
+ public GroupMetadataResponse(short errorCode, Node node) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+ coordinator.set(NODE_ID_KEY_NAME, node.id());
+ coordinator.set(HOST_KEY_NAME, node.host());
+ coordinator.set(PORT_KEY_NAME, node.port());
+ struct.set(COORDINATOR_KEY_NAME, coordinator);
+ this.errorCode = errorCode;
+ this.node = node;
+ }
+
+ public GroupMetadataResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+ int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+ String host = broker.getString(HOST_KEY_NAME);
+ int port = broker.getInt(PORT_KEY_NAME);
+ node = new Node(nodeId, host, port);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ public static GroupMetadataResponse parse(ByteBuffer buffer) {
+ return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 89719f1..74be3ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -25,27 +25,27 @@ public class HeartbeatRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
private static final String GROUP_ID_KEY_NAME = "group_id";
private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
private final String groupId;
private final int groupGenerationId;
- private final String consumerId;
+ private final String memberId;
- public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+ public HeartbeatRequest(String groupId, int groupGenerationId, String memberId) {
super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
this.groupId = groupId;
this.groupGenerationId = groupGenerationId;
- this.consumerId = consumerId;
+ this.memberId = memberId;
}
public HeartbeatRequest(Struct struct) {
super(struct);
groupId = struct.getString(GROUP_ID_KEY_NAME);
groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ memberId = struct.getString(MEMBER_ID_KEY_NAME);
}
@Override
@@ -67,8 +67,8 @@ public class HeartbeatRequest extends AbstractRequest {
return groupGenerationId;
}
- public String consumerId() {
- return consumerId;
+ public String memberId() {
+ return memberId;
}
public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 96e6ab0..48cb4c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -27,10 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
- * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
- * UNKNOWN_CONSUMER_ID (25)
+ * UNKNOWN_MEMBER_ID (25)
*/
private final short errorCode;
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1ffe076..91a698c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -12,7 +12,6 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -29,42 +28,79 @@ public class JoinGroupRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
private static final String GROUP_ID_KEY_NAME = "group_id";
private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
- private static final String TOPICS_KEY_NAME = "topics";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
+ private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+ private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
+ private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
+ private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata";
- public static final String UNKNOWN_CONSUMER_ID = "";
+ public static final String UNKNOWN_MEMBER_ID = "";
private final String groupId;
private final int sessionTimeout;
- private final List<String> topics;
- private final String consumerId;
- private final String strategy;
+ private final String memberId;
+ private final String protocolType;
+ private final List<GroupProtocol> groupProtocols;
- public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+ public static class GroupProtocol {
+ private final String name;
+ private final ByteBuffer metadata;
+
+ public GroupProtocol(String name, ByteBuffer metadata) {
+ this.name = name;
+ this.metadata = metadata;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public ByteBuffer metadata() {
+ return metadata;
+ }
+ }
+
+ public JoinGroupRequest(String groupId,
+ int sessionTimeout,
+ String memberId,
+ String protocolType,
+ List<GroupProtocol> groupProtocols) {
super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
- struct.set(TOPICS_KEY_NAME, topics.toArray());
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
- struct.set(STRATEGY_KEY_NAME, strategy);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
+ struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
+
+ List<Struct> groupProtocolsList = new ArrayList<>();
+ for (GroupProtocol protocol : groupProtocols) {
+ Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
+ protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
+ protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
+ groupProtocolsList.add(protocolStruct);
+ }
+
+ struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
this.groupId = groupId;
this.sessionTimeout = sessionTimeout;
- this.topics = topics;
- this.consumerId = consumerId;
- this.strategy = strategy;
+ this.memberId = memberId;
+ this.protocolType = protocolType;
+ this.groupProtocols = groupProtocols;
}
public JoinGroupRequest(Struct struct) {
super(struct);
groupId = struct.getString(GROUP_ID_KEY_NAME);
sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
- Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
- topics = new ArrayList<String>();
- for (Object topic: topicsArray)
- topics.add((String) topic);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
- strategy = struct.getString(STRATEGY_KEY_NAME);
+ memberId = struct.getString(MEMBER_ID_KEY_NAME);
+ protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
+
+ groupProtocols = new ArrayList<>();
+ for (Object groupProtocolObj : struct.getArray(GROUP_PROTOCOLS_KEY_NAME)) {
+ Struct groupProtocolStruct = (Struct) groupProtocolObj;
+ String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
+ ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
+ groupProtocols.add(new GroupProtocol(name, metadata));
+ }
}
@Override
@@ -74,8 +110,10 @@ public class JoinGroupRequest extends AbstractRequest {
return new JoinGroupResponse(
Errors.forException(e).code(),
JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_CONSUMER_ID,
- Collections.<TopicPartition>emptyList());
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Collections.<String, ByteBuffer>emptyMap());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
@@ -90,16 +128,16 @@ public class JoinGroupRequest extends AbstractRequest {
return sessionTimeout;
}
- public List<String> topics() {
- return topics;
+ public String memberId() {
+ return memberId;
}
- public String consumerId() {
- return consumerId;
+ public List<GroupProtocol> groupProtocols() {
+ return groupProtocols;
}
- public String strategy() {
- return strategy;
+ public String protocolType() {
+ return protocolType;
}
public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 7bf544e..c65a4bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -12,15 +12,16 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class JoinGroupResponse extends AbstractRequestResponse {
@@ -30,63 +31,78 @@ public class JoinGroupResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
- * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
- * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
- * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
- * UNKNOWN_CONSUMER_ID (25)
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
+ * INCONSISTENT_GROUP_PROTOCOL (23)
+ * UNKNOWN_MEMBER_ID (25)
* INVALID_SESSION_TIMEOUT (26)
*/
- private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partitions";
+ private static final String GENERATION_ID_KEY_NAME = "generation_id";
+ private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
+ private static final String LEADER_ID_KEY_NAME = "leader_id";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
+ private static final String MEMBERS_KEY_NAME = "members";
+ private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+
+ public static final String UNKNOWN_PROTOCOL = "";
public static final int UNKNOWN_GENERATION_ID = -1;
- public static final String UNKNOWN_CONSUMER_ID = "";
+ public static final String UNKNOWN_MEMBER_ID = "";
private final short errorCode;
private final int generationId;
- private final String consumerId;
- private final List<TopicPartition> assignedPartitions;
-
- public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+ private final String groupProtocol;
+ private final String memberId;
+ private final String leaderId;
+ private final Map<String, ByteBuffer> members;
+
+ public JoinGroupResponse(short errorCode,
+ int generationId,
+ String groupProtocol,
+ String memberId,
+ String leaderId,
+ Map<String, ByteBuffer> groupMembers) {
super(new Struct(CURRENT_SCHEMA));
- Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
-
struct.set(ERROR_CODE_KEY_NAME, errorCode);
struct.set(GENERATION_ID_KEY_NAME, generationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
- Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entries.getKey());
- topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
- topicArray.add(topicData);
+ struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
+ struct.set(LEADER_ID_KEY_NAME, leaderId);
+
+ List<Struct> memberArray = new ArrayList<>();
+ for (Map.Entry<String, ByteBuffer> entries: groupMembers.entrySet()) {
+ Struct memberData = struct.instance(MEMBERS_KEY_NAME);
+ memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+ memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
+ memberArray.add(memberData);
}
- struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+ struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
this.errorCode = errorCode;
this.generationId = generationId;
- this.consumerId = consumerId;
- this.assignedPartitions = assignedPartitions;
+ this.groupProtocol = groupProtocol;
+ this.memberId = memberId;
+ this.leaderId = leaderId;
+ this.members = groupMembers;
}
public JoinGroupResponse(Struct struct) {
super(struct);
- assignedPartitions = new ArrayList<TopicPartition>();
- for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
- Struct topicData = (Struct) topicDataObj;
- String topic = topicData.getString(TOPIC_KEY_NAME);
- for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
- assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+ members = new HashMap<>();
+
+ for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
+ Struct memberData = (Struct) memberDataObj;
+ String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+ ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME);
+ members.put(memberId, memberMetadata);
}
errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
generationId = struct.getInt(GENERATION_ID_KEY_NAME);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
+ memberId = struct.getString(MEMBER_ID_KEY_NAME);
+ leaderId = struct.getString(LEADER_ID_KEY_NAME);
}
public short errorCode() {
@@ -97,12 +113,24 @@ public class JoinGroupResponse extends AbstractRequestResponse {
return generationId;
}
- public String consumerId() {
- return consumerId;
+ public String groupProtocol() {
+ return groupProtocol;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ public String leaderId() {
+ return leaderId;
+ }
+
+ public boolean isLeader() {
+ return memberId.equals(leaderId);
}
- public List<TopicPartition> assignedPartitions() {
- return assignedPartitions;
+ public Map<String, ByteBuffer> members() {
+ return members;
}
public static JoinGroupResponse parse(ByteBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 03df1e7..8721efa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -34,7 +34,7 @@ public class OffsetCommitRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
private static final String GROUP_ID_KEY_NAME = "group_id";
private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
private static final String TOPICS_KEY_NAME = "topics";
private static final String RETENTION_TIME_KEY_NAME = "retention_time";
@@ -52,7 +52,7 @@ public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
public static final int DEFAULT_GENERATION_ID = -1;
- public static final String DEFAULT_CONSUMER_ID = "";
+ public static final String DEFAULT_MEMBER_ID = "";
public static final long DEFAULT_RETENTION_TIME = -1L;
// default values for old versions,
@@ -61,7 +61,7 @@ public class OffsetCommitRequest extends AbstractRequest {
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
private final String groupId;
- private final String consumerId;
+ private final String memberId;
private final int generationId;
private final long retentionTime;
private final Map<TopicPartition, PartitionData> offsetData;
@@ -97,7 +97,7 @@ public class OffsetCommitRequest extends AbstractRequest {
initCommonFields(groupId, offsetData);
this.groupId = groupId;
this.generationId = DEFAULT_GENERATION_ID;
- this.consumerId = DEFAULT_CONSUMER_ID;
+ this.memberId = DEFAULT_MEMBER_ID;
this.retentionTime = DEFAULT_RETENTION_TIME;
this.offsetData = offsetData;
}
@@ -106,19 +106,19 @@ public class OffsetCommitRequest extends AbstractRequest {
* Constructor for version 1.
* @param groupId
* @param generationId
- * @param consumerId
+ * @param memberId
* @param offsetData
*/
@Deprecated
- public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+ public OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
initCommonFields(groupId, offsetData);
struct.set(GENERATION_ID_KEY_NAME, generationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
this.groupId = groupId;
this.generationId = generationId;
- this.consumerId = consumerId;
+ this.memberId = memberId;
this.retentionTime = DEFAULT_RETENTION_TIME;
this.offsetData = offsetData;
}
@@ -127,20 +127,20 @@ public class OffsetCommitRequest extends AbstractRequest {
* Constructor for version 2.
* @param groupId
* @param generationId
- * @param consumerId
+ * @param memberId
* @param retentionTime
* @param offsetData
*/
- public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
+ public OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(CURRENT_SCHEMA));
initCommonFields(groupId, offsetData);
struct.set(GENERATION_ID_KEY_NAME, generationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
this.groupId = groupId;
this.generationId = generationId;
- this.consumerId = consumerId;
+ this.memberId = memberId;
this.retentionTime = retentionTime;
this.offsetData = offsetData;
}
@@ -183,10 +183,10 @@ public class OffsetCommitRequest extends AbstractRequest {
generationId = DEFAULT_GENERATION_ID;
// This field only exists in v1.
- if (struct.hasField(CONSUMER_ID_KEY_NAME))
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ if (struct.hasField(MEMBER_ID_KEY_NAME))
+ memberId = struct.getString(MEMBER_ID_KEY_NAME);
else
- consumerId = DEFAULT_CONSUMER_ID;
+ memberId = DEFAULT_MEMBER_ID;
// This field only exists in v2
if (struct.hasField(RETENTION_TIME_KEY_NAME))
@@ -243,8 +243,8 @@ public class OffsetCommitRequest extends AbstractRequest {
return generationId;
}
- public String consumerId() {
- return consumerId;
+ public String memberId() {
+ return memberId;
}
public long retentionTime() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index a163333..dae9c37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -42,10 +42,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
* Possible error code:
*
* OFFSET_METADATA_TOO_LARGE (12)
- * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
- * UNKNOWN_CONSUMER_ID (25)
+ * UNKNOWN_MEMBER_ID (25)
* COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 3dc8521..09ac74a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -49,9 +49,9 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
*
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* OFFSET_LOAD_IN_PROGRESS (14)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
- * UNKNOWN_CONSUMER_ID (25)
+ * UNKNOWN_MEMBER_ID (25)
*/
private final Map<TopicPartition, PartitionData> responseData;
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
new file mode 100644
index 0000000..606584b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SyncGroupRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SYNC_GROUP.id);
+ public static final String GROUP_ID_KEY_NAME = "group_id";
+ public static final String GENERATION_ID_KEY_NAME = "generation_id";
+ public static final String MEMBER_ID_KEY_NAME = "member_id";
+ public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+ public static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
+
+ private final String groupId;
+ private final int generationId;
+ private final String memberId;
+ private final Map<String, ByteBuffer> groupAssignment;
+
+ public SyncGroupRequest(String groupId,
+ int generationId,
+ String memberId,
+ Map<String, ByteBuffer> groupAssignment) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
+
+ List<Struct> memberArray = new ArrayList<>();
+ for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) {
+ Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
+ memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+ memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
+ memberArray.add(memberData);
+ }
+ struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
+
+ this.groupId = groupId;
+ this.generationId = generationId;
+ this.memberId = memberId;
+ this.groupAssignment = groupAssignment;
+ }
+
+ public SyncGroupRequest(Struct struct) {
+ super(struct);
+ this.groupId = struct.getString(GROUP_ID_KEY_NAME);
+ this.generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+ this.memberId = struct.getString(MEMBER_ID_KEY_NAME);
+
+ groupAssignment = new HashMap<>();
+
+ for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) {
+ Struct memberData = (Struct) memberDataObj;
+ String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+ ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+ groupAssignment.put(memberId, memberMetadata);
+ }
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new SyncGroupResponse(
+ Errors.forException(e).code(),
+ ByteBuffer.wrap(new byte[]{}));
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int generationId() {
+ return generationId;
+ }
+
+ public Map<String, ByteBuffer> groupAssignment() {
+ return groupAssignment;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) {
+ return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
new file mode 100644
index 0000000..a96b7e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class SyncGroupResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id);
+ public static final String ERROR_CODE_KEY_NAME = "error_code";
+ public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+ /**
+ * Possible error codes:
+ *
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
+ * ILLEGAL_GENERATION (22)
+ * UNKNOWN_MEMBER_ID (25)
+ * REBALANCE_IN_PROGRESS (30)
+ *
+ */
+
+ private final short errorCode;
+ private final ByteBuffer memberState;
+
+ public SyncGroupResponse(short errorCode, ByteBuffer memberState) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
+
+ this.errorCode = errorCode;
+ this.memberState = memberState;
+ }
+
+ public SyncGroupResponse(Struct struct) {
+ super(struct);
+
+ this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public ByteBuffer memberAssignment() {
+ return memberState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a7a2968..bc0e645 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -23,6 +23,7 @@ import java.io.StringWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +53,18 @@ public class Utils {
private static final Logger log = LoggerFactory.getLogger(Utils.class);
/**
+ * Get a sorted list representation of a collection.
+ * @param collection The collection to sort
+ * @param <T> The class of objects in the collection
+ * @return An unmodifiable sorted list with the contents of the collection
+ */
+ public static <T extends Comparable<? super T>> List<T> sorted(Collection<T> collection) {
+ List<T> res = new ArrayList<>(collection);
+ Collections.sort(res);
+ return Collections.unmodifiableList(res);
+ }
+
+ /**
* Turn the given UTF8 byte array into a string
*
* @param bytes The byte array
@@ -114,6 +127,21 @@ public class Utils {
}
/**
+ * Get the little-endian value of an integer as a byte array.
+ * @param val The value to convert to a litte-endian array
+ * @return The little-endian encoded array of bytes for the value
+ */
+ public static byte[] toArrayLE(int val) {
+ return new byte[] {
+ (byte) (val >> 8 * 0),
+ (byte) (val >> 8 * 1),
+ (byte) (val >> 8 * 2),
+ (byte) (val >> 8 * 3)
+ };
+ }
+
+
+ /**
* Read an unsigned integer stored in little-endian format from a byte array
* at a given offset.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index b7160a1..55d7608 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -195,7 +195,6 @@ public class MetadataTest {
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
}
-
private Thread asyncFetch(final String topic) {
Thread thread = new Thread() {
public void run() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
new file mode 100644
index 0000000..13cce13
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RangeAssignorTest {
+
+ private RangeAssignor assignor = new RangeAssignor();
+
+
+ @Test
+ public void testOneConsumerNoTopic() {
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Collections.<String>emptyList()));
+
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertTrue(assignment.get(consumerId).isEmpty());
+ }
+
+ @Test
+ public void testOneConsumerNonexistentTopic() {
+ String topic = "topic";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 0);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertTrue(assignment.get(consumerId).isEmpty());
+ }
+
+ @Test
+ public void testOneConsumerOneTopic() {
+ String topic = "topic";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic, 2)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testOnlyAssignsPartitionsFromSubscribedTopics() {
+ String topic = "topic";
+ String otherTopic = "other";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(otherTopic, 3);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic, 2)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testOneConsumerMultipleTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 1);
+ partitionsPerTopic.put(topic2, 2);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2)));
+
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testTwoConsumersOneTopicOnePartition() {
+ String topic = "topic";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 1);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic));
+ consumers.put(consumer2, Arrays.asList(topic));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+ assertAssignment(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+ }
+
+
+ @Test
+ public void testTwoConsumersOneTopicTwoPartitions() {
+ String topic = "topic";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 2);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic));
+ consumers.put(consumer2, Arrays.asList(topic));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+ assertAssignment(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2));
+ }
+
+ @Test
+ public void testMultipleConsumersMixedTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 2);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic1));
+ consumers.put(consumer2, Arrays.asList(topic1, topic2));
+ consumers.put(consumer3, Arrays.asList(topic1));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 0)), assignment.get(consumer1));
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1)), assignment.get(consumer2));
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 2)), assignment.get(consumer3));
+ }
+
+ @Test
+ public void testTwoConsumersTwoTopicsSixPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic1, topic2));
+ consumers.put(consumer2, Arrays.asList(topic1, topic2));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1)), assignment.get(consumer1));
+ assertAssignment(Arrays.asList(
+ new TopicPartition(topic1, 2),
+ new TopicPartition(topic2, 2)), assignment.get(consumer2));
+ }
+
+ private void assertAssignment(List<TopicPartition> expected, List<TopicPartition> actual) {
+ // order doesn't matter for assignment, so convert to a set
+ assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
new file mode 100644
index 0000000..31598cd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RoundRobinAssignorTest {
+
+ private RoundRobinAssignor assignor = new RoundRobinAssignor();
+
+
+ @Test
+ public void testOneConsumerNoTopic() {
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Collections.<String>emptyList()));
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertTrue(assignment.get(consumerId).isEmpty());
+ }
+
+ @Test
+ public void testOneConsumerNonexistentTopic() {
+ String topic = "topic";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 0);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertTrue(assignment.get(consumerId).isEmpty());
+ }
+
+ @Test
+ public void testOneConsumerOneTopic() {
+ String topic = "topic";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic, 2)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testOnlyAssignsPartitionsFromSubscribedTopics() {
+ String topic = "topic";
+ String otherTopic = "other";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(otherTopic, 3);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic)));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic, 2)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testOneConsumerMultipleTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 1);
+ partitionsPerTopic.put(topic2, 2);
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
+ Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2)));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1)), assignment.get(consumerId));
+ }
+
+ @Test
+ public void testTwoConsumersOneTopicOnePartition() {
+ String topic = "topic";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 1);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic));
+ consumers.put(consumer2, Arrays.asList(topic));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+ assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+ }
+
+ @Test
+ public void testTwoConsumersOneTopicTwoPartitions() {
+ String topic = "topic";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 2);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic));
+ consumers.put(consumer2, Arrays.asList(topic));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
+ assertEquals(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2));
+ }
+
+ @Test
+ public void testMultipleConsumersMixedTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 2);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic1));
+ consumers.put(consumer2, Arrays.asList(topic1, topic2));
+ consumers.put(consumer3, Arrays.asList(topic1));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 0)), assignment.get(consumer1));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1)), assignment.get(consumer2));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 2)), assignment.get(consumer3));
+ }
+
+ @Test
+ public void testTwoConsumersTwoTopicsSixPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ Map<String, List<String>> consumers = new HashMap<>();
+ consumers.put(consumer1, Arrays.asList(topic1, topic2));
+ consumers.put(consumer2, Arrays.asList(topic1, topic2));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 2),
+ new TopicPartition(topic2, 1)), assignment.get(consumer1));
+ assertEquals(Arrays.asList(
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 2)), assignment.get(consumer2));
+ }
+
+ public static List<String> topics(String... topics) {
+ return Arrays.asList(topics);
+ }
+
+ public static TopicPartition tp(String topic, int partition) {
+ return new TopicPartition(topic, partition);
+ }
+
+}