You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:17 UTC
[20/37] git commit: kafka-1462;
Add new request and response formats for the new consumer and
coordinator communication; patched by Jun Rao;
reviewed by Guozhang Wang and Jay Kreps
kafka-1462; Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Guozhang Wang and Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc0e03f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc0e03f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc0e03f7
Branch: refs/heads/transactional_messaging
Commit: fc0e03f79131746da81c05c12e056862c08d79d4
Parents: 4ebcdfd
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jul 17 18:20:01 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 17 18:20:01 2014 -0700
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 12 +-
.../java/org/apache/kafka/common/Cluster.java | 15 +-
.../apache/kafka/common/protocol/ApiKeys.java | 7 +-
.../apache/kafka/common/protocol/Protocol.java | 280 ++++++++++++++++++-
.../kafka/common/protocol/types/Struct.java | 67 +++++
.../requests/AbstractRequestResponse.java | 66 +++++
.../requests/ConsumerMetadataRequest.java | 47 ++++
.../requests/ConsumerMetadataResponse.java | 69 +++++
.../kafka/common/requests/FetchRequest.java | 132 +++++++++
.../kafka/common/requests/FetchResponse.java | 110 ++++++++
.../kafka/common/requests/HeartbeatRequest.java | 64 +++++
.../common/requests/HeartbeatResponse.java | 45 +++
.../kafka/common/requests/JoinGroupRequest.java | 87 ++++++
.../common/requests/JoinGroupResponse.java | 102 +++++++
.../common/requests/ListOffsetRequest.java | 114 ++++++++
.../common/requests/ListOffsetResponse.java | 108 +++++++
.../kafka/common/requests/MetadataRequest.java | 29 +-
.../kafka/common/requests/MetadataResponse.java | 98 +++++--
.../common/requests/OffsetCommitRequest.java | 180 ++++++++++++
.../common/requests/OffsetCommitResponse.java | 87 ++++++
.../common/requests/OffsetFetchRequest.java | 98 +++++++
.../common/requests/OffsetFetchResponse.java | 107 +++++++
.../kafka/common/requests/ProduceRequest.java | 124 +++++---
.../kafka/common/requests/ProduceResponse.java | 96 ++++---
.../kafka/common/requests/RequestHeader.java | 49 ++--
.../kafka/common/requests/ResponseHeader.java | 22 +-
.../kafka/common/utils/CollectionUtils.java | 62 ++++
.../apache/kafka/clients/NetworkClientTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 173 ++++++++++++
.../kafka/api/ConsumerMetadataRequest.scala | 4 +-
.../kafka/api/ConsumerMetadataResponse.scala | 4 +-
.../kafka/api/ControlledShutdownRequest.scala | 4 +-
.../kafka/api/ControlledShutdownResponse.scala | 4 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 4 +-
.../api/GenericRequestOrResponseAndHeader.scala | 45 +++
.../kafka/api/HeartbeatRequestAndHeader.scala | 39 +++
.../kafka/api/HeartbeatResponseAndHeader.scala | 28 ++
.../kafka/api/JoinGroupRequestAndHeader.scala | 40 +++
.../kafka/api/JoinGroupResponseAndHeader.scala | 28 ++
.../scala/kafka/api/LeaderAndIsrRequest.scala | 4 +-
.../scala/kafka/api/LeaderAndIsrResponse.scala | 4 +-
.../scala/kafka/api/OffsetCommitRequest.scala | 38 ++-
.../scala/kafka/api/OffsetCommitResponse.scala | 4 +-
.../scala/kafka/api/OffsetFetchRequest.scala | 4 +-
.../scala/kafka/api/OffsetFetchResponse.scala | 4 +-
.../main/scala/kafka/api/OffsetRequest.scala | 4 +-
.../main/scala/kafka/api/OffsetResponse.scala | 4 +-
.../main/scala/kafka/api/ProducerRequest.scala | 4 +-
.../main/scala/kafka/api/ProducerResponse.scala | 4 +-
core/src/main/scala/kafka/api/RequestKeys.scala | 7 +-
.../scala/kafka/api/RequestOrResponse.scala | 2 +-
.../scala/kafka/api/StopReplicaRequest.scala | 4 +-
.../scala/kafka/api/StopReplicaResponse.scala | 4 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 4 +-
.../scala/kafka/api/TopicMetadataResponse.scala | 4 +-
.../scala/kafka/api/UpdateMetadataRequest.scala | 4 +-
.../kafka/api/UpdateMetadataResponse.scala | 4 +-
.../controller/ControllerChannelManager.scala | 8 +-
.../kafka/javaapi/OffsetCommitRequest.scala | 2 -
.../kafka/javaapi/TopicMetadataRequest.scala | 4 +-
.../api/RequestResponseSerializationTest.scala | 74 ++++-
61 files changed, 2610 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a016269..8ebe7ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -276,13 +277,16 @@ public class Sender implements Runnable {
* Create a produce request from the given record batches
*/
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
- ProduceRequest request = new ProduceRequest(acks, timeout);
+ Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
- batch.records.buffer().flip();
- request.add(batch.topicPartition, batch.records);
- recordsByPartition.put(batch.topicPartition, batch);
+ TopicPartition tp = batch.topicPartition;
+ ByteBuffer recordsBuffer = batch.records.buffer();
+ recordsBuffer.flip();
+ produceRecordsByPartition.put(tp, recordsBuffer);
+ recordsByPartition.put(tp, batch);
}
+ ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct());
return new ClientRequest(now, acks != 0, send, recordsByPartition);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c62707a..d3299b9 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -15,12 +15,7 @@ package org.apache.kafka.common;
import org.apache.kafka.common.utils.Utils;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
@@ -143,6 +138,14 @@ public final class Cluster {
return this.partitionsByNode.get(nodeId);
}
+ /**
+ * Get all topics.
+ * @return a set of all topics
+ */
+ public Set<String> topics() {
+ return this.partitionsByTopic.keySet();
+ }
+
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6fe7573..109fc96 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,8 +30,11 @@ public enum ApiKeys {
METADATA(3, "metadata"),
LEADER_AND_ISR(4, "leader_and_isr"),
STOP_REPLICA(5, "stop_replica"),
- OFFSET_COMMIT(6, "offset_commit"),
- OFFSET_FETCH(7, "offset_fetch");
+ OFFSET_COMMIT(8, "offset_commit"),
+ OFFSET_FETCH(9, "offset_fetch"),
+ CONSUMER_METADATA(10, "consumer_metadata"),
+ JOIN_GROUP(11, "join_group"),
+ HEARTBEAT(12, "heartbeat");
private static ApiKeys[] codeToType;
public static int MAX_API_KEY = -1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 044b030..7517b87 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -104,6 +104,264 @@ public class Protocol {
public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
+ /* Offset commit api */
+ public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Message offset to be committed."),
+ new Field("timestamp",
+ INT64,
+ "Timestamp of the commit"),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+ "Partitions to commit offsets."));
+
+ public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
+
+ public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
+
+ public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code",
+ INT16));
+
+ public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
+
+ public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+ public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
+ /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
+ public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+
+ /* Offset fetch api */
+ public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."));
+
+ public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to fetch offset."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch offsets."));
+
+ public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch offsets."));
+
+ public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Last committed message offset."),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."),
+ new Field("error_code",
+ INT16));
+
+ public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+ public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+ public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
+ public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+
+ /* List offset api */
+ public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("timestamp",
+ INT64,
+ "Timestamp."),
+ new Field("max_num_offsets",
+ INT32,
+ "Maximum offsets to return."));
+
+ public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to list offset."),
+ new Field("partitions",
+ new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+ "Partitions to list offset."));
+
+ public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("topics",
+ new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+ "Topics to list offsets."));
+
+ public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code",
+ INT16),
+ new Field("offsets",
+ new ArrayOf(INT64),
+ "A list of offsets."));
+
+ public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+ public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+ public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 };
+ public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 };
+
+ /* Fetch api */
+ public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("fetch_offset",
+ INT64,
+ "Message offset."),
+ new Field("max_bytes",
+ INT32,
+ "Maximum bytes to fetch."));
+
+ public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to fetch."),
+ new Field("partitions",
+ new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch."));
+
+ public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ INT32,
+ "Minimum bytes to accumulate in the response."),
+ new Field("topics",
+ new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch."));
+
+ public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code",
+ INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."),
+ new Field("record_set", BYTES));
+
+ public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+
+ public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+ public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 };
+ public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 };
+
+ /* Consumer metadata api */
+ public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."));
+
+ public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
+ INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
+
+ public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
+ public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+
+ /* Join group api */
+ public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("session_timeout",
+ INT32,
+ "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+ new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to subscribe to."),
+ new Field("consumer_id",
+ STRING,
+ "The assigned consumer id or an empty string for a new consumer."),
+ new Field("partition_assignment_strategy",
+ STRING,
+ "The strategy for the coordinator to assign partitions."));
+
+ public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(INT32)));
+ public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
+ INT16),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("assigned_partitions",
+ new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+
+ public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 };
+ public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 };
+
+ /* Heartbeat api */
+ public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."));
+
+ public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code",
+ INT16));
+
+ public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+ public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions */
public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -113,22 +371,28 @@ public class Protocol {
static {
REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
- REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
- REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+ REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
+ REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
- REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+ REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
+ REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
+ REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+ REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
+ REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
- RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
- RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
+ RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
+ RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
- RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+ RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
+ RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
+ RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+ RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
+ RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 8cecba5..444e69e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -83,6 +83,15 @@ public class Struct {
return getFieldOrDefault(field);
}
+ /**
+ * Check if the struct contains a field.
+ * @param name
+ * @return
+ */
+ public boolean hasField(String name) {
+ return schema.get(name) != null;
+ }
+
public Struct getStruct(Field field) {
return (Struct) get(field);
}
@@ -107,6 +116,22 @@ public class Struct {
return (Integer) get(name);
}
+ public Long getLong(Field field) {
+ return (Long) get(field);
+ }
+
+ public Long getLong(String name) {
+ return (Long) get(name);
+ }
+
+ public ByteBuffer getBytes(Field field) {
+ return (ByteBuffer) get(field);
+ }
+
+ public ByteBuffer getBytes(String name) {
+ return (ByteBuffer) get(name);
+ }
+
public Object[] getArray(Field field) {
return (Object[]) get(field);
}
@@ -253,4 +278,46 @@ public class Struct {
return b.toString();
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ for (int i = 0; i < this.values.length; i++) {
+ Field f = this.schema.get(i);
+ if (f.type() instanceof ArrayOf) {
+ Object[] arrayObject = (Object []) this.get(f);
+ for (Object arrayItem: arrayObject)
+ result = prime * result + arrayItem.hashCode();
+ } else {
+ result = prime * result + this.get(f).hashCode();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Struct other = (Struct) obj;
+ if (schema != other.schema)
+ return false;
+ for (int i = 0; i < this.values.length; i++) {
+ Field f = this.schema.get(i);
+ Boolean result;
+ if (f.type() instanceof ArrayOf) {
+ result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
+ } else {
+ result = this.get(f).equals(other.get(f));
+ }
+ if (!result)
+ return false;
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
new file mode 100644
index 0000000..37aff6c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractRequestResponse {
+ protected final Struct struct;
+
+
+ public AbstractRequestResponse(Struct struct) {
+ this.struct = struct;
+ }
+
+ public Struct toStruct() {
+ return struct;
+ }
+
+ /**
+ * Get the serialized size of this object
+ */
+ public int sizeOf() {
+ return struct.sizeOf();
+ }
+
+ /**
+ * Write this object to a buffer
+ */
+ public void writeTo(ByteBuffer buffer) {
+ struct.writeTo(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return struct.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return struct.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AbstractRequestResponse other = (AbstractRequestResponse) obj;
+ return struct.equals(other.struct);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
new file mode 100644
index 0000000..99b52c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ConsumerMetadataRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
+ private static String GROUP_ID_KEY_NAME = "group_id";
+
+ private final String groupId;
+
+ public ConsumerMetadataRequest(String groupId) {
+ super(new Struct(curSchema));
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ this.groupId = groupId;
+ }
+
+ public ConsumerMetadataRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
+ return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
new file mode 100644
index 0000000..8b8f591
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ConsumerMetadataResponse extends AbstractRequestResponse {
+ private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static String COORDINATOR_KEY_NAME = "coordinator";
+
+ // coordinator level field names
+ private static String NODE_ID_KEY_NAME = "node_id";
+ private static String HOST_KEY_NAME = "host";
+ private static String PORT_KEY_NAME = "port";
+
+ private final short errorCode;
+ private final Node node;
+
+ public ConsumerMetadataResponse(short errorCode, Node node) {
+ super(new Struct(curSchema));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+ coordinator.set(NODE_ID_KEY_NAME, node.id());
+ coordinator.set(HOST_KEY_NAME, node.host());
+ coordinator.set(PORT_KEY_NAME, node.port());
+ struct.set(COORDINATOR_KEY_NAME, coordinator);
+ this.errorCode = errorCode;
+ this.node = node;
+ }
+
+ public ConsumerMetadataResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+ int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+ String host = broker.getString(HOST_KEY_NAME);
+ int port = broker.getInt(PORT_KEY_NAME);
+ node = new Node(nodeId, host, port);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
+ return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
new file mode 100644
index 0000000..2fc471f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+ private static String REPLICA_ID_KEY_NAME = "replica_id";
+ private static String MAX_WAIT_KEY_NAME = "max_wait_time";
+ private static String MIN_BYTES_KEY_NAME = "min_bytes";
+ private static String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+ private static String MAX_BYTES_KEY_NAME = "max_bytes";
+
+ private final int replicaId;
+ private final int maxWait;
+ private final int minBytes;
+ private final Map<TopicPartition, PartitionData> fetchData;
+
+ public static final class PartitionData {
+ public final long offset;
+ public final int maxBytes;
+
+ public PartitionData(long offset, int maxBytes) {
+ this.offset = offset;
+ this.maxBytes = maxBytes;
+ }
+ }
+
+ public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
+
+ struct.set(REPLICA_ID_KEY_NAME, replicaId);
+ struct.set(MAX_WAIT_KEY_NAME, maxWait);
+ struct.set(MIN_BYTES_KEY_NAME, minBytes);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.replicaId = replicaId;
+ this.maxWait = maxWait;
+ this.minBytes = minBytes;
+ this.fetchData = fetchData;
+ }
+
+ public FetchRequest(Struct struct) {
+ super(struct);
+ replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+ maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
+ minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
+ fetchData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
+ int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
+ PartitionData partitionData = new PartitionData(offset, maxBytes);
+ fetchData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public int replicaId() {
+ return replicaId;
+ }
+
+ public int maxWait() {
+ return maxWait;
+ }
+
+ public int minBytes() {
+ return minBytes;
+ }
+
+ public Map<TopicPartition, PartitionData> fetchData() {
+ return fetchData;
+ }
+
+ public static FetchRequest parse(ByteBuffer buffer) {
+ return new FetchRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
new file mode 100644
index 0000000..f719010
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchResponse extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+ private static String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+ private static String RECORD_SET_KEY_NAME = "record_set";
+
+ private final Map<TopicPartition, PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final short errorCode;
+ public final long highWatermark;
+ public final ByteBuffer recordSet;
+
+ public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+ this.errorCode = errorCode;
+ this.highWatermark = highWatermark;
+ this.recordSet = recordSet;
+ }
+ }
+
+ public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+ partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+ partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public FetchResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+ ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static FetchResponse parse(ByteBuffer buffer) {
+ return new FetchResponse(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
new file mode 100644
index 0000000..9512db2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class HeartbeatRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+ private static String GROUP_ID_KEY_NAME = "group_id";
+ private static String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+ private final String groupId;
+ private final int groupGenerationId;
+ private final String consumerId;
+
+ public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+ super(new Struct(curSchema));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ this.groupId = groupId;
+ this.groupGenerationId = groupGenerationId;
+ this.consumerId = consumerId;
+ }
+
+ public HeartbeatRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int groupGenerationId() {
+ return groupGenerationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public static HeartbeatRequest parse(ByteBuffer buffer) {
+ return new HeartbeatRequest(((Struct) curSchema.read(buffer)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
new file mode 100644
index 0000000..8997ffc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class HeartbeatResponse extends AbstractRequestResponse {
+ private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+
+ private final short errorCode;
+ public HeartbeatResponse(short errorCode) {
+ super(new Struct(curSchema));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ this.errorCode = errorCode;
+ }
+
+ public HeartbeatResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static HeartbeatResponse parse(ByteBuffer buffer) {
+ return new HeartbeatResponse(((Struct) curSchema.read(buffer)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
new file mode 100644
index 0000000..d6e91f3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JoinGroupRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+ private static String GROUP_ID_KEY_NAME = "group_id";
+ private static String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+ private static String TOPICS_KEY_NAME = "topics";
+ private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+
+ private final String groupId;
+ private final int sessionTimeout;
+ private final List<String> topics;
+ private final String consumerId;
+ private final String strategy;
+
+ public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+ super(new Struct(curSchema));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+ struct.set(TOPICS_KEY_NAME, topics.toArray());
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(STRATEGY_KEY_NAME, strategy);
+ this.groupId = groupId;
+ this.sessionTimeout = sessionTimeout;
+ this.topics = topics;
+ this.consumerId = consumerId;
+ this.strategy = strategy;
+ }
+
+ public JoinGroupRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+ Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
+ topics = new ArrayList<String>();
+ for (Object topic: topicsArray)
+ topics.add((String) topic);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ strategy = struct.getString(STRATEGY_KEY_NAME);
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int sessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public List<String> topics() {
+ return topics;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public String strategy() {
+ return strategy;
+ }
+
+ public static JoinGroupRequest parse(ByteBuffer buffer) {
+ return new JoinGroupRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
new file mode 100644
index 0000000..efe8979
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class JoinGroupResponse extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partitions";
+
+ public static int UNKNOWN_GENERATION_ID = -1;
+ public static String UNKNOWN_CONSUMER_ID = "";
+
+ private final short errorCode;
+ private final int generationId;
+ private final String consumerId;
+ private final List<TopicPartition> assignedPartitions;
+
+ public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+ super(new Struct(curSchema));
+
+ Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
+ Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+
+ this.errorCode = errorCode;
+ this.generationId = generationId;
+ this.consumerId = consumerId;
+ this.assignedPartitions = assignedPartitions;
+ }
+
+ public JoinGroupResponse(short errorCode) {
+ this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.<TopicPartition>emptyList());
+ }
+
+ public JoinGroupResponse(Struct struct) {
+ super(struct);
+ assignedPartitions = new ArrayList<TopicPartition>();
+ for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
+ Struct topicData = (Struct) topicDataObj;
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
+ assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+ }
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public int generationId() {
+ return generationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public List<TopicPartition> assignedPartitions() {
+ return assignedPartitions;
+ }
+
+ public static JoinGroupResponse parse(ByteBuffer buffer) {
+ return new JoinGroupResponse(((Struct) curSchema.read(buffer)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
new file mode 100644
index 0000000..99364c1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ListOffsetRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+ private static String REPLICA_ID_KEY_NAME = "replica_id";
+ private static String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String TIMESTAMP_KEY_NAME = "timestamp";
+ private static String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+
+ private final int replicaId;
+ private final Map<TopicPartition, PartitionData> offsetData;
+
+ public static final class PartitionData {
+ public final long timestamp;
+ public final int maxNumOffsets;
+
+ public PartitionData(long timestamp, int maxNumOffsets) {
+ this.timestamp = timestamp;
+ this.maxNumOffsets = maxNumOffsets;
+ }
+ }
+
+ public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+ struct.set(REPLICA_ID_KEY_NAME, replicaId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData offsetPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
+ partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.replicaId = replicaId;
+ this.offsetData = offsetData;
+ }
+
+ public ListOffsetRequest(Struct struct) {
+ super(struct);
+ replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+ offsetData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+ int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
+ PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
+ offsetData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public int replicaId() {
+ return replicaId;
+ }
+
+ public Map<TopicPartition, PartitionData> offsetData() {
+ return offsetData;
+ }
+
+ public static ListOffsetRequest parse(ByteBuffer buffer) {
+ return new ListOffsetRequest(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
new file mode 100644
index 0000000..ac23971
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ListOffsetResponse extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+ private static String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static String TOPIC_KEY_NAME = "topic";
+ private static String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level field names
+ private static String PARTITION_KEY_NAME = "partition";
+ private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static String OFFSETS_KEY_NAME = "offsets";
+
+ private final Map<TopicPartition, PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final short errorCode;
+ public final List<Long> offsets;
+
+ public PartitionData(short errorCode, List<Long> offsets) {
+ this.errorCode = errorCode;
+ this.offsets = offsets;
+ }
+ }
+
+ public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(curSchema));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData offsetPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
+ partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public ListOffsetResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
+ List<Long> offsetsList = new ArrayList<Long>();
+ for (Object offset: offsets)
+ offsetsList.add((Long) offset);
+ PartitionData partitionData = new PartitionData(errorCode, offsetsList);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static ListOffsetResponse parse(ByteBuffer buffer) {
+ return new ListOffsetResponse(((Struct) curSchema.read(buffer)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f35bd87..b22ca1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -12,26 +12,41 @@
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-public class MetadataRequest {
+public class MetadataRequest extends AbstractRequestResponse {
+ public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+ private static String TOPICS_KEY_NAME = "topics";
private final List<String> topics;
public MetadataRequest(List<String> topics) {
+ super(new Struct(curSchema));
+ struct.set(TOPICS_KEY_NAME, topics.toArray());
this.topics = topics;
}
- public Struct toStruct() {
- String[] ts = new String[topics.size()];
- topics.toArray(ts);
- Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
- body.set("topics", topics.toArray());
- return body;
+ public MetadataRequest(Struct struct) {
+ super(struct);
+ Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+ topics = new ArrayList<String>();
+ for (Object topicObj: topicArray) {
+ topics.add((String) topicObj);
+ }
}
+ public List<String> topics() {
+ return topics;
+ }
+
+ public static MetadataRequest parse(ByteBuffer buffer) {
+ return new MetadataRequest(((Struct) curSchema.read(buffer)));
+ }
}