You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:16 UTC
[19/37] kafka-1462;
Add new request and response formats for the new consumer and
coordinator communication; patched by Jun Rao;
reviewed by Guozhang Wang and Jay Kreps
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 2652c32..7d90fce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -20,50 +21,112 @@ import java.util.Map;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-public class MetadataResponse {
+public class MetadataResponse extends AbstractRequestResponse {
+ private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+ private static String BROKERS_KEY_NAME = "brokers";
+ private static String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+
+ // broker level field names
+ private static String NODE_ID_KEY_NAME = "node_id";
+ private static String HOST_KEY_NAME = "host";
+ private static String PORT_KEY_NAME = "port";
+
+ // topic level field names
+ private static String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+
+ // partition level field names
+ private static String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+ private static String PARTITION_KEY_NAME = "partition_id";
+ private static String LEADER_KEY_NAME = "leader";
+ private static String REPLICAS_KEY_NAME = "replicas";
+ private static String ISR_KEY_NAME = "isr";
private final Cluster cluster;
private final Map<String, Errors> errors;
- public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+ public MetadataResponse(Cluster cluster) {
+ super(new Struct(curSchema));
+
+ List<Struct> brokerArray = new ArrayList<Struct>();
+ for (Node node: cluster.nodes()) {
+ Struct broker = struct.instance(BROKERS_KEY_NAME);
+ broker.set(NODE_ID_KEY_NAME, node.id());
+ broker.set(HOST_KEY_NAME, node.host());
+ broker.set(PORT_KEY_NAME, node.port());
+ brokerArray.add(broker);
+ }
+ struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (String topic: cluster.topics()) {
+ Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0); // no error
+ topicData.set(TOPIC_KEY_NAME, topic);
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+ Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+ partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0); // no error
+ partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+ partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+ ArrayList<Integer> replicas = new ArrayList<Integer>();
+ for (Node node: fetchPartitionData.replicas())
+ replicas.add(node.id());
+ partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+ ArrayList<Integer> isr = new ArrayList<Integer>();
+ for (Node node: fetchPartitionData.inSyncReplicas())
+ isr.add(node.id());
+ partitionData.set(ISR_KEY_NAME, isr.toArray());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
+
this.cluster = cluster;
- this.errors = errors;
+ this.errors = new HashMap<String, Errors>();
}
public MetadataResponse(Struct struct) {
+ super(struct);
Map<String, Errors> errors = new HashMap<String, Errors>();
Map<Integer, Node> brokers = new HashMap<Integer, Node>();
- Object[] brokerStructs = (Object[]) struct.get("brokers");
+ Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
for (int i = 0; i < brokerStructs.length; i++) {
Struct broker = (Struct) brokerStructs[i];
- int nodeId = (Integer) broker.get("node_id");
- String host = (String) broker.get("host");
- int port = (Integer) broker.get("port");
+ int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+ String host = broker.getString(HOST_KEY_NAME);
+ int port = broker.getInt(PORT_KEY_NAME);
brokers.put(nodeId, new Node(nodeId, host, port));
}
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
- Object[] topicInfos = (Object[]) struct.get("topic_metadata");
+ Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
for (int i = 0; i < topicInfos.length; i++) {
Struct topicInfo = (Struct) topicInfos[i];
- short topicError = topicInfo.getShort("topic_error_code");
- String topic = topicInfo.getString("topic");
+ short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
+ String topic = topicInfo.getString(TOPIC_KEY_NAME);
if (topicError == Errors.NONE.code()) {
- Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+ Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
for (int j = 0; j < partitionInfos.length; j++) {
Struct partitionInfo = (Struct) partitionInfos[j];
- short partError = partitionInfo.getShort("partition_error_code");
+ short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME);
if (partError == Errors.NONE.code()) {
- int partition = partitionInfo.getInt("partition_id");
- int leader = partitionInfo.getInt("leader");
+ int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+ int leader = partitionInfo.getInt(LEADER_KEY_NAME);
Node leaderNode = leader == -1 ? null : brokers.get(leader);
- Object[] replicas = (Object[]) partitionInfo.get("replicas");
+ Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
Node[] replicaNodes = new Node[replicas.length];
for (int k = 0; k < replicas.length; k++)
replicaNodes[k] = brokers.get(replicas[k]);
- Object[] isr = (Object[]) partitionInfo.get("isr");
+ Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
Node[] isrNodes = new Node[isr.length];
for (int k = 0; k < isr.length; k++)
isrNodes[k] = brokers.get(isr[k]);
@@ -86,4 +149,7 @@ public class MetadataResponse {
return this.cluster;
}
+ public static MetadataResponse parse(ByteBuffer buffer) {
+ return new MetadataResponse(((Struct) curSchema.read(buffer)));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
new file mode 100644
index 0000000..3ee5cba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetCommitRequest.
+ */
+public class OffsetCommitRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static String GROUP_ID_KEY_NAME = "group_id";
+ private static String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static String TIMESTAMP_KEY_NAME = "timestamp";
+ private static String METADATA_KEY_NAME = "metadata";
+
+ public static final int DEFAULT_GENERATION_ID = -1;
+ public static final String DEFAULT_CONSUMER_ID = "";
+
+ private final String groupId;
+ private final int generationId;
+ private final String consumerId;
+ private final Map<TopicPartition, PartitionData> offsetData;
+
+ public static final class PartitionData {
+ public final long offset;
+ public final long timestamp;
+ public final String metadata;
+
+ public PartitionData(long offset, long timestamp, String metadata) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.metadata = metadata;
+ }
+ }
+
+ /**
+ * Constructor for version 0.
+ * @param groupId
+ * @param offsetData
+ */
+ @Deprecated
+ public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+ initCommonFields(groupId, offsetData);
+ this.groupId = groupId;
+ this.generationId = DEFAULT_GENERATION_ID;
+ this.consumerId = DEFAULT_CONSUMER_ID;
+ this.offsetData = offsetData;
+ }
+
+ /**
+ * Constructor for version 1.
+ * @param groupId
+ * @param generationId
+ * @param consumerId
+ * @param offsetData
+ */
+ public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(curSchema));
+
+ initCommonFields(groupId, offsetData);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ this.groupId = groupId;
+ this.generationId = generationId;
+ this.consumerId = consumerId;
+ this.offsetData = offsetData;
+ }
+
+ private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+ partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ }
+
+ public OffsetCommitRequest(Struct struct) {
+ super(struct);
+ offsetData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
+ long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+ String metadata = partitionResponse.getString(METADATA_KEY_NAME);
+ PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
+ offsetData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ // This field only exists in v1.
+ if (struct.hasField(GENERATION_ID_KEY_NAME))
+ generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+ else
+ generationId = DEFAULT_GENERATION_ID;
+
+ // This field only exists in v1.
+ if (struct.hasField(CONSUMER_ID_KEY_NAME))
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ else
+ consumerId = DEFAULT_CONSUMER_ID;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int generationId() {
+ return generationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public Map<TopicPartition, PartitionData> offsetData() {
+ return offsetData;
+ }
+
+ public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
+ Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
+ return new OffsetCommitRequest(((Struct) schema.read(buffer)));
+ }
+
+ public static OffsetCommitRequest parse(ByteBuffer buffer) {
+ return new OffsetCommitRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
new file mode 100644
index 0000000..711232a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetCommitResponse extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static String RESPONSES_KEY_NAME = "responses";
+
+ // topic level fields
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level fields
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+
+ private final Map<TopicPartition, Short> responseData;
+
+ public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ super(new Struct(curSchema));
+
+ Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public OffsetCommitResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, Short>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ responseData.put(new TopicPartition(topic, partition), errorCode);
+ }
+ }
+ }
+
+ public Map<TopicPartition, Short> responseData() {
+ return responseData;
+ }
+
+ public static OffsetCommitResponse parse(ByteBuffer buffer) {
+ return new OffsetCommitResponse(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
new file mode 100644
index 0000000..90d5135
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetFetchRequest.
+ */
+public class OffsetFetchRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
+ private static String GROUP_ID_KEY_NAME = "group_id";
+ private static String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+
+ public static final int DEFAULT_GENERATION_ID = -1;
+ public static final String DEFAULT_CONSUMER_ID = "";
+
+ private final String groupId;
+ private final List<TopicPartition> partitions;
+
+ public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
+ super(new Struct(curSchema));
+
+ Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Integer partiitonId : entries.getValue()) {
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partiitonId);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.groupId = groupId;
+ this.partitions = partitions;
+ }
+
+ public OffsetFetchRequest(Struct struct) {
+ super(struct);
+ partitions = new ArrayList<TopicPartition>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ partitions.add(new TopicPartition(topic, partition));
+ }
+ }
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public List<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ public static OffsetFetchRequest parse(ByteBuffer buffer) {
+ return new OffsetFetchRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
new file mode 100644
index 0000000..6b7c269
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetFetchResponse extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+ private static String RESPONSES_KEY_NAME = "responses";
+
+ // topic level fields
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level fields
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static String METADATA_KEY_NAME = "metadata";
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+
+ private final Map<TopicPartition,PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final long offset;
+ public final String metadata;
+ public final short errorCode;
+
+ public PartitionData(long offset, String metadata, short errorCode) {
+ this.offset = offset;
+ this.metadata = metadata;
+ this.errorCode = errorCode;
+ }
+ }
+
+ public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(curSchema));
+
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> entries: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+ partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public OffsetFetchResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
+ String metadata = partitionResponse.getString(METADATA_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static OffsetFetchResponse parse(ByteBuffer buffer) {
+ return new OffsetFetchResponse(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 6036f6a..3dbba8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -1,71 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.MemoryRecords;
+public class ProduceRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
+ private static String ACKS_KEY_NAME = "acks";
+ private static String TIMEOUT_KEY_NAME = "timeout";
+ private static String TOPIC_DATA_KEY_NAME = "topic_data";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITION_DATA_KEY_NAME = "data";
-public class ProduceRequest {
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String RECORD_SET_KEY_NAME = "record_set";
private final short acks;
private final int timeout;
- private final Map<String, List<PartitionRecords>> records;
+ private final Map<TopicPartition, ByteBuffer> partitionRecords;
- public ProduceRequest(short acks, int timeout) {
+ public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
+ struct.set(ACKS_KEY_NAME, acks);
+ struct.set(TIMEOUT_KEY_NAME, timeout);
+ List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
+ for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
+ Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
+ ByteBuffer buffer = partitionEntry.getValue().duplicate();
+ Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
+ .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+ .set(RECORD_SET_KEY_NAME, buffer);
+ partitionArray.add(part);
+ }
+ topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
+ topicDatas.add(topicData);
+ }
+ struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
this.acks = acks;
this.timeout = timeout;
- this.records = new HashMap<String, List<PartitionRecords>>();
+ this.partitionRecords = partitionRecords;
}
- public void add(TopicPartition tp, MemoryRecords recs) {
- List<PartitionRecords> found = this.records.get(tp.topic());
- if (found == null) {
- found = new ArrayList<PartitionRecords>();
- records.put(tp.topic(), found);
+ public ProduceRequest(Struct struct) {
+ super(struct);
+ partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
+ for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
+ Struct topicData = (Struct) topicDataObj;
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ partitionRecords.put(new TopicPartition(topic, partition), records);
+ }
}
- found.add(new PartitionRecords(tp, recs));
+ acks = struct.getShort(ACKS_KEY_NAME);
+ timeout = struct.getInt(TIMEOUT_KEY_NAME);
}
- public Struct toStruct() {
- Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
- produce.set("acks", acks);
- produce.set("timeout", timeout);
- List<Struct> topicDatas = new ArrayList<Struct>(records.size());
- for (Map.Entry<String, List<PartitionRecords>> entry : records.entrySet()) {
- Struct topicData = produce.instance("topic_data");
- topicData.set("topic", entry.getKey());
- List<PartitionRecords> parts = entry.getValue();
- Object[] partitionData = new Object[parts.size()];
- for (int i = 0; i < parts.size(); i++) {
- ByteBuffer buffer = parts.get(i).records.buffer();
- buffer.flip();
- Struct part = topicData.instance("data")
- .set("partition", parts.get(i).topicPartition.partition())
- .set("record_set", buffer);
- partitionData[i] = part;
- }
- topicData.set("data", partitionData);
- topicDatas.add(topicData);
- }
- produce.set("topic_data", topicDatas.toArray());
- return produce;
+ public short acks() {
+ return acks;
}
- private static final class PartitionRecords {
- public final TopicPartition topicPartition;
- public final MemoryRecords records;
+ public int timeout() {
+ return timeout;
+ }
- public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) {
- this.topicPartition = topicPartition;
- this.records = records;
- }
+ public Map<TopicPartition, ByteBuffer> partitionRecords() {
+ return partitionRecords;
}
+ public static ProduceRequest parse(ByteBuffer buffer) {
+ return new ProduceRequest(((Struct) curSchema.read(buffer)));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 6cf4fb7..5220464 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -12,67 +12,83 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.Struct;
+public class ProduceResponse extends AbstractRequestResponse {
+ private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+ private static String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
-public class ProduceResponse {
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static String BASE_OFFSET_KEY_NAME = "base_offset";
private final Map<TopicPartition, PartitionResponse> responses;
- public ProduceResponse() {
- this.responses = new HashMap<TopicPartition, PartitionResponse>();
+ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
+ List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+ for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
+ PartitionResponse part = partitionEntry.getValue();
+ Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
+ .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+ .set(ERROR_CODE_KEY_NAME, part.errorCode)
+ .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+ partitionArray.add(partStruct);
+ }
+ topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
+ topicDatas.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+ this.responses = responses;
}
public ProduceResponse(Struct struct) {
+ super(struct);
responses = new HashMap<TopicPartition, PartitionResponse>();
- for (Object topicResponse : (Object[]) struct.get("responses")) {
+ for (Object topicResponse : struct.getArray("responses")) {
Struct topicRespStruct = (Struct) topicResponse;
- String topic = (String) topicRespStruct.get("topic");
- for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
+ String topic = topicRespStruct.getString("topic");
+ for (Object partResponse : topicRespStruct.getArray("partition_responses")) {
Struct partRespStruct = (Struct) partResponse;
- int partition = (Integer) partRespStruct.get("partition");
- short errorCode = (Short) partRespStruct.get("error_code");
- long offset = (Long) partRespStruct.get("base_offset");
+ int partition = partRespStruct.getInt("partition");
+ short errorCode = partRespStruct.getShort("error_code");
+ long offset = partRespStruct.getLong("base_offset");
TopicPartition tp = new TopicPartition(topic, partition);
- responses.put(tp, new PartitionResponse(partition, errorCode, offset));
+ responses.put(tp, new PartitionResponse(errorCode, offset));
}
}
}
- public void addResponse(TopicPartition tp, int partition, short error, long baseOffset) {
- this.responses.put(tp, new PartitionResponse(partition, error, baseOffset));
- }
-
public Map<TopicPartition, PartitionResponse> responses() {
return this.responses;
}
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- boolean isFirst = true;
- for (Map.Entry<TopicPartition, PartitionResponse> entry : responses.entrySet()) {
- if (isFirst)
- isFirst = false;
- else
- b.append(',');
- b.append(entry.getKey() + " : " + entry.getValue());
- }
- b.append('}');
- return b.toString();
- }
-
- public static class PartitionResponse {
- public int partitionId;
+ public static final class PartitionResponse {
public short errorCode;
public long baseOffset;
- public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
- this.partitionId = partitionId;
+ public PartitionResponse(short errorCode, long baseOffset) {
this.errorCode = errorCode;
this.baseOffset = baseOffset;
}
@@ -81,9 +97,7 @@ public class ProduceResponse {
public String toString() {
StringBuilder b = new StringBuilder();
b.append('{');
- b.append("pid: ");
- b.append(partitionId);
- b.append(",error: ");
+ b.append("error: ");
b.append(errorCode);
b.append(",offset: ");
b.append(baseOffset);
@@ -91,4 +105,8 @@ public class ProduceResponse {
return b.toString();
}
}
+
+ public static ProduceResponse parse(ByteBuffer buffer) {
+ return new ProduceResponse(((Struct) curSchema.read(buffer)));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 66cc2fe..f459a2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -24,18 +24,24 @@ import org.apache.kafka.common.protocol.types.Struct;
/**
* The header for a request in the Kafka protocol
*/
-public class RequestHeader {
+public class RequestHeader extends AbstractRequestResponse {
private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
- private final Struct header;
+ private final short apiKey;
+ private final short apiVersion;
+ private final String clientId;
+ private final int correlationId;
public RequestHeader(Struct header) {
- super();
- this.header = header;
+ super(header);
+ apiKey = struct.getShort(API_KEY_FIELD);
+ apiVersion = struct.getShort(API_VERSION_FIELD);
+ clientId = struct.getString(CLIENT_ID_FIELD);
+ correlationId = struct.getInt(CORRELATION_ID_FIELD);
}
public RequestHeader(short apiKey, String client, int correlation) {
@@ -43,43 +49,34 @@ public class RequestHeader {
}
public RequestHeader(short apiKey, short version, String client, int correlation) {
- this(new Struct(Protocol.REQUEST_HEADER));
- this.header.set(API_KEY_FIELD, apiKey);
- this.header.set(API_VERSION_FIELD, version);
- this.header.set(CLIENT_ID_FIELD, client);
- this.header.set(CORRELATION_ID_FIELD, correlation);
+ super(new Struct(Protocol.REQUEST_HEADER));
+ struct.set(API_KEY_FIELD, apiKey);
+ struct.set(API_VERSION_FIELD, version);
+ struct.set(CLIENT_ID_FIELD, client);
+ struct.set(CORRELATION_ID_FIELD, correlation);
+ this.apiKey = apiKey;
+ this.apiVersion = version;
+ this.clientId = client;
+ this.correlationId = correlation;
}
public short apiKey() {
- return (Short) this.header.get(API_KEY_FIELD);
+ return apiKey;
}
public short apiVersion() {
- return (Short) this.header.get(API_VERSION_FIELD);
+ return apiVersion;
}
public String clientId() {
- return (String) this.header.get(CLIENT_ID_FIELD);
+ return clientId;
}
public int correlationId() {
- return (Integer) this.header.get(CORRELATION_ID_FIELD);
+ return correlationId;
}
public static RequestHeader parse(ByteBuffer buffer) {
return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
}
-
- public void writeTo(ByteBuffer buffer) {
- header.writeTo(buffer);
- }
-
- public int sizeOf() {
- return header.sizeOf();
- }
-
- @Override
- public String toString() {
- return header.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index 257b828..dd63853 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -28,31 +28,25 @@ import org.apache.kafka.common.protocol.types.Struct;
/**
* A response header in the kafka protocol.
*/
-public class ResponseHeader {
+public class ResponseHeader extends AbstractRequestResponse {
private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
- private final Struct header;
+ private final int correlationId;
public ResponseHeader(Struct header) {
- this.header = header;
+ super(header);
+ correlationId = struct.getInt(CORRELATION_KEY_FIELD);
}
public ResponseHeader(int correlationId) {
- this(new Struct(Protocol.RESPONSE_HEADER));
- this.header.set(CORRELATION_KEY_FIELD, correlationId);
+ super(new Struct(Protocol.RESPONSE_HEADER));
+ struct.set(CORRELATION_KEY_FIELD, correlationId);
+ this.correlationId = correlationId;
}
public int correlationId() {
- return (Integer) header.get(CORRELATION_KEY_FIELD);
- }
-
- public void writeTo(ByteBuffer buffer) {
- header.writeTo(buffer);
- }
-
- public int sizeOf() {
- return header.sizeOf();
+ return correlationId;
}
public static ResponseHeader parse(ByteBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
new file mode 100644
index 0000000..ba38637
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CollectionUtils {
+ /**
+ * group data by topic
+ * @param data Data to be partitioned
+ * @param <T> Partition data type
+ * @return partitioned data
+ */
+ public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, T> data) {
+ Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, Map<Integer, T>>();
+ for (Map.Entry<TopicPartition, T> entry: data.entrySet()) {
+ String topic = entry.getKey().topic();
+ int partition = entry.getKey().partition();
+ Map<Integer, T> topicData = dataByTopic.get(topic);
+ if (topicData == null) {
+ topicData = new HashMap<Integer, T>();
+ dataByTopic.put(topic, topicData);
+ }
+ topicData.put(partition, entry.getValue());
+ }
+ return dataByTopic;
+ }
+
+ /**
+ * group partitions by topic
+ * @param partitions
+ * @return partitions per topic
+ */
+ public static Map<String, List<Integer>> groupDataByTopic(List<TopicPartition> partitions) {
+ Map<String, List<Integer>> partitionsByTopic = new HashMap<String, List<Integer>>();
+ for (TopicPartition tp: partitions) {
+ String topic = tp.topic();
+ List<Integer> topicData = partitionsByTopic.get(topic);
+ if (topicData == null) {
+ topicData = new ArrayList<Integer>();
+ partitionsByTopic.put(topic, topicData);
+ }
+ topicData.add(tp.partition());
+ }
+ return partitionsByTopic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2f98192..1a55242 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -7,11 +7,13 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -68,7 +70,7 @@ public class NetworkClientTest {
@Test
public void testSimpleRequestResponse() {
- ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000);
+ ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
new file mode 100644
index 0000000..df37fc6
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class RequestResponseTest {
+
+ @Test
+ public void testSerialization() throws Exception{
+ List<AbstractRequestResponse> requestList = Arrays.asList(
+ createRequestHeader(),
+ createResponseHeader(),
+ createConsumerMetadataRequest(),
+ createConsumerMetadataResponse(),
+ createFetchRequest(),
+ createFetchResponse(),
+ createHeartBeatRequest(),
+ createHeartBeatResponse(),
+ createJoinGroupRequest(),
+ createJoinGroupResponse(),
+ createListOffsetRequest(),
+ createListOffsetResponse(),
+ createMetadataRequest(),
+ createMetadataResponse(),
+ createOffsetCommitRequest(),
+ createOffsetCommitResponse(),
+ createOffsetFetchRequest(),
+ createOffsetFetchResponse(),
+ createProduceRequest(),
+ createProduceResponse());
+
+ for (AbstractRequestResponse req: requestList) {
+ ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
+ req.writeTo(buffer);
+ buffer.rewind();
+ Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class);
+ AbstractRequestResponse deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer);
+ assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized);
+ assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.",
+ req.hashCode(), deserialized.hashCode());
+ }
+ }
+
+ private AbstractRequestResponse createRequestHeader() {
+ return new RequestHeader((short)10, (short)1, "", 10);
+ }
+
+ private AbstractRequestResponse createResponseHeader() {
+ return new ResponseHeader(10);
+ }
+
+ private AbstractRequestResponse createConsumerMetadataRequest() {
+ return new ConsumerMetadataRequest("test-group");
+ }
+
+ private AbstractRequestResponse createConsumerMetadataResponse() {
+ return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014));
+ }
+
+ private AbstractRequestResponse createFetchRequest() {
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+ fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
+ fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
+ return new FetchRequest(-1, 100, 100000, fetchData);
+ }
+
+ private AbstractRequestResponse createFetchResponse() {
+ Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10)));
+ return new FetchResponse(responseData);
+ }
+
+ private AbstractRequestResponse createHeartBeatRequest() {
+ return new HeartbeatRequest("group1", 1, "consumer1");
+ }
+
+ private AbstractRequestResponse createHeartBeatResponse() {
+ return new HeartbeatResponse((short)0);
+ }
+
+ private AbstractRequestResponse createJoinGroupRequest() {
+ return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1");
+ }
+
+ private AbstractRequestResponse createJoinGroupResponse() {
+ return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+ }
+
+ private AbstractRequestResponse createListOffsetRequest() {
+ Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
+ offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
+ return new ListOffsetRequest(-1, offsetData);
+ }
+
+ private AbstractRequestResponse createListOffsetResponse() {
+ Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+ responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L)));
+ return new ListOffsetResponse(responseData);
+ }
+
+ private AbstractRequestResponse createMetadataRequest() {
+ return new MetadataRequest(Arrays.asList("topic1"));
+ }
+
+ private AbstractRequestResponse createMetadataResponse() {
+ Node node = new Node(1, "host1", 1001);
+ Node[] replicas = new Node[1];
+ replicas[0] = node;
+ Node[] isr = new Node[1];
+ isr[0] = node;
+ Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
+ return new MetadataResponse(cluster);
+ }
+
+ private AbstractRequestResponse createOffsetCommitRequest() {
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
+ commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, ""));
+ return new OffsetCommitRequest("group1", 100, "consumer1", commitData);
+ }
+
+ private AbstractRequestResponse createOffsetCommitResponse() {
+ Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+ responseData.put(new TopicPartition("test", 0), (short)0);
+ return new OffsetCommitResponse(responseData);
+ }
+
+ private AbstractRequestResponse createOffsetFetchRequest() {
+ return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1)));
+ }
+
+ private AbstractRequestResponse createOffsetFetchResponse() {
+ Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+ responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0));
+ return new OffsetFetchResponse(responseData);
+ }
+
+ private AbstractRequestResponse createProduceRequest() {
+ Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
+ produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
+ return new ProduceRequest((short)0, 5000, produceData);
+ }
+
+ private AbstractRequestResponse createProduceResponse() {
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+ responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
+ return new ProduceResponse(responseData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index dfad6e6..6d00ed0 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -41,9 +41,9 @@ object ConsumerMetadataRequest {
case class ConsumerMetadataRequest(group: String,
versionId: Short = ConsumerMetadataRequest.CurrentVersion,
- override val correlationId: Int = 0,
+ correlationId: Int = 0,
clientId: String = ConsumerMetadataRequest.DefaultClientId)
- extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) {
+ extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) {
def sizeInBytes =
2 + /* versionId */
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index c72ca14..84f6017 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -40,8 +40,8 @@ object ConsumerMetadataResponse {
}
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
- extends RequestOrResponse(correlationId = correlationId) {
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0)
+ extends RequestOrResponse() {
def sizeInBytes =
4 + /* correlationId */
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 7dacb20..5be393a 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -38,9 +38,9 @@ object ControlledShutdownRequest extends Logging {
}
case class ControlledShutdownRequest(val versionId: Short,
- override val correlationId: Int,
+ val correlationId: Int,
val brokerId: Int)
- extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){
+ extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
def this(correlationId: Int, brokerId: Int) =
this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 46ec3db..5e0a1cf 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -39,10 +39,10 @@ object ControlledShutdownResponse {
}
-case class ControlledShutdownResponse(override val correlationId: Int,
+case class ControlledShutdownResponse(val correlationId: Int,
val errorCode: Short = ErrorMapping.NoError,
val partitionsRemaining: Set[TopicAndPartition])
- extends RequestOrResponse(correlationId = correlationId) {
+ extends RequestOrResponse() {
def sizeInBytes(): Int ={
var size =
4 /* correlation id */ +
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index a8b73ac..55a5982 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -60,13 +60,13 @@ object FetchRequest {
}
case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
- override val correlationId: Int = FetchRequest.DefaultCorrelationId,
+ correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
- extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) {
+ extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
/**
* Partitions the request info into a map of maps (one for each topic).
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
new file mode 100644
index 0000000..fb022e8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+
+private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse,
+ val body: AbstractRequestResponse,
+ val name: String,
+ override val requestId: Option[Short] = None)
+ extends RequestOrResponse(requestId) {
+
+ def writeTo(buffer: ByteBuffer) {
+ header.writeTo(buffer)
+ body.writeTo(buffer)
+ }
+
+ def sizeInBytes(): Int = {
+ header.sizeOf() + body.sizeOf();
+ }
+
+ override def toString(): String = {
+ describe(true)
+ }
+
+ override def describe(details: Boolean): String = {
+ val strBuffer = new StringBuilder
+ strBuffer.append("Name: " + name)
+ strBuffer.append("; header: " + header.toString)
+ strBuffer.append("; body: " + body.toString)
+ strBuffer.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
new file mode 100644
index 0000000..932418b
--- /dev/null
+++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader}
+
+object HeartbeatRequestAndHeader {
+ def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = {
+ val header = RequestHeader.parse(buffer)
+ val body = HeartbeatRequest.parse(buffer)
+ new HeartbeatRequestAndHeader(header, body)
+ }
+}
+
+case class HeartbeatRequestAndHeader(override val header: RequestHeader, override val body: HeartbeatRequest)
+ extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val errorResponseHeader = new ResponseHeader(header.correlationId)
+ val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
new file mode 100644
index 0000000..556f38d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse}
+import java.nio.ByteBuffer
+
+object HeartbeatResponseAndHeader {
+ def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = {
+ val header = ResponseHeader.parse(buffer)
+ val body = HeartbeatResponse.parse(buffer)
+ new HeartbeatResponseAndHeader(header, body)
+ }
+}
+
+case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse)
+ extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
new file mode 100644
index 0000000..9aea28c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import org.apache.kafka.common.requests._
+import kafka.network.RequestChannel.Response
+import scala.Some
+
+object JoinGroupRequestAndHeader {
+ def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = {
+ val header = RequestHeader.parse(buffer)
+ val body = JoinGroupRequest.parse(buffer)
+ new JoinGroupRequestAndHeader(header, body)
+ }
+}
+
+case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest)
+ extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val errorResponseHeader = new ResponseHeader(header.correlationId)
+ val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
new file mode 100644
index 0000000..7389ae6
--- /dev/null
+++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader}
+import java.nio.ByteBuffer
+
+object JoinGroupResponseAndHeader {
+ def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = {
+ val header = ResponseHeader.parse(buffer)
+ val body = JoinGroupResponse.parse(buffer)
+ new JoinGroupResponseAndHeader(header, body)
+ }
+}
+
+case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse)
+ extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 3e40817..4ff7e8f 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -129,13 +129,13 @@ object LeaderAndIsrRequest {
}
case class LeaderAndIsrRequest (versionId: Short,
- override val correlationId: Int,
+ correlationId: Int,
clientId: String,
controllerId: Int,
controllerEpoch: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
leaders: Set[Broker])
- extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
+ extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
controllerEpoch: Int, correlationId: Int, clientId: String) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index f636444..22ce48a 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -41,10 +41,10 @@ object LeaderAndIsrResponse {
}
-case class LeaderAndIsrResponse(override val correlationId: Int,
+case class LeaderAndIsrResponse(correlationId: Int,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
- extends RequestOrResponse(correlationId = correlationId) {
+ extends RequestOrResponse() {
def sizeInBytes(): Int ={
var size =
4 /* correlation id */ +