You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/16 23:46:53 UTC
kafka git commit: kafka-2195;
Add versionId to AbstractRequest.getErrorResponse and
AbstractRequest.getRequest; patched by Andrii Biletskyi; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 20a31a29f -> 54e54f080
kafka-2195; Add versionId to AbstractRequest.getErrorResponse and AbstractRequest.getRequest; patched by Andrii Biletskyi; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54e54f08
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54e54f08
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54e54f08
Branch: refs/heads/trunk
Commit: 54e54f08077c9d71a5121e640b55836e6f7f2c9b
Parents: 20a31a2
Author: Andrii Biletskyi <an...@stealth.ly>
Authored: Tue Jun 16 14:46:48 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 14:46:48 2015 -0700
----------------------------------------------------------------------
.../kafka/common/requests/AbstractRequest.java | 24 +++----
.../requests/ConsumerMetadataRequest.java | 14 +++-
.../kafka/common/requests/FetchRequest.java | 18 +++--
.../kafka/common/requests/HeartbeatRequest.java | 20 ++++--
.../kafka/common/requests/JoinGroupRequest.java | 26 +++++--
.../common/requests/JoinGroupResponse.java | 4 --
.../common/requests/ListOffsetRequest.java | 14 +++-
.../kafka/common/requests/MetadataRequest.java | 21 +++++-
.../kafka/common/requests/MetadataResponse.java | 73 +++++++++-----------
.../common/requests/OffsetCommitRequest.java | 14 +++-
.../common/requests/OffsetFetchRequest.java | 20 ++++--
.../kafka/common/requests/ProduceRequest.java | 17 +++--
.../common/requests/RequestResponseTest.java | 23 +++---
.../scala/kafka/network/RequestChannel.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
15 files changed, 192 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5e5308e..5d3d528 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -28,33 +28,33 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
}
/**
- * Get an error response for a request
+ * Get an error response for a request for a given api version
*/
- public abstract AbstractRequestResponse getErrorResponse(Throwable e);
+ public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
/**
* Factory method for getting a request object based on ApiKey ID and a buffer
*/
- public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) {
+ public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
switch (ApiKeys.forId(requestId)) {
case PRODUCE:
- return ProduceRequest.parse(buffer);
+ return ProduceRequest.parse(buffer, versionId);
case FETCH:
- return FetchRequest.parse(buffer);
+ return FetchRequest.parse(buffer, versionId);
case LIST_OFFSETS:
- return ListOffsetRequest.parse(buffer);
+ return ListOffsetRequest.parse(buffer, versionId);
case METADATA:
- return MetadataRequest.parse(buffer);
+ return MetadataRequest.parse(buffer, versionId);
case OFFSET_COMMIT:
- return OffsetCommitRequest.parse(buffer);
+ return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
- return OffsetFetchRequest.parse(buffer);
+ return OffsetFetchRequest.parse(buffer, versionId);
case CONSUMER_METADATA:
- return ConsumerMetadataRequest.parse(buffer);
+ return ConsumerMetadataRequest.parse(buffer, versionId);
case JOIN_GROUP:
- return JoinGroupRequest.parse(buffer);
+ return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
- return HeartbeatRequest.parse(buffer);
+ return HeartbeatRequest.parse(buffer, versionId);
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 04b90bf..fef22d7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -41,14 +41,24 @@ public class ConsumerMetadataRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
- return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch(versionId) {
+ case 0:
+ return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
+ }
}
public String groupId() {
return groupId;
}
+ public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
+ return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
+ }
+
public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 8686d83..4f52c32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -120,17 +120,23 @@ public class FetchRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
- FetchResponse.INVALID_HIGHWATERMARK,
- FetchResponse.EMPTY_RECORD_SET);
+ FetchResponse.INVALID_HIGHWATERMARK,
+ FetchResponse.EMPTY_RECORD_SET);
responseData.put(entry.getKey(), partitionResponse);
}
- return new FetchResponse(responseData);
+ switch(versionId) {
+ case 0:
+ return new FetchResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
+ }
}
public int replicaId() {
@@ -149,6 +155,10 @@ public class FetchRequest extends AbstractRequest {
return fetchData;
}
+ public static FetchRequest parse(ByteBuffer buffer, int versionId) {
+ return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
+ }
+
public static FetchRequest parse(ByteBuffer buffer) {
return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 51d081f..d4d4a35 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -48,6 +48,17 @@ public class HeartbeatRequest extends AbstractRequest {
consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
}
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch(versionId) {
+ case 0:
+ return new HeartbeatResponse(Errors.forException(e).code());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
+ }
+ }
+
public String groupId() {
return groupId;
}
@@ -60,12 +71,11 @@ public class HeartbeatRequest extends AbstractRequest {
return consumerId;
}
- public static HeartbeatRequest parse(ByteBuffer buffer) {
- return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
+ return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
}
- @Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
- return new HeartbeatResponse(Errors.forException(e).code());
+ public static HeartbeatRequest parse(ByteBuffer buffer) {
+ return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 6795682..1ffe076 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -20,6 +21,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class JoinGroupRequest extends AbstractRequest {
@@ -65,6 +67,21 @@ public class JoinGroupRequest extends AbstractRequest {
strategy = struct.getString(STRATEGY_KEY_NAME);
}
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new JoinGroupResponse(
+ Errors.forException(e).code(),
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_CONSUMER_ID,
+ Collections.<TopicPartition>emptyList());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+ }
+ }
+
public String groupId() {
return groupId;
}
@@ -85,12 +102,11 @@ public class JoinGroupRequest extends AbstractRequest {
return strategy;
}
- public static JoinGroupRequest parse(ByteBuffer buffer) {
- return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
+ return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
}
- @Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
- return new JoinGroupResponse(Errors.forException(e).code());
+ public static JoinGroupRequest parse(ByteBuffer buffer) {
+ return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 8d418cd..7bf544e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -75,10 +75,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
this.assignedPartitions = assignedPartitions;
}
- public JoinGroupResponse(short errorCode) {
- this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.<TopicPartition>emptyList());
- }
-
public JoinGroupResponse(Struct struct) {
super(struct);
assignedPartitions = new ArrayList<TopicPartition>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 19267ee..a1bb2b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -107,7 +107,7 @@ public class ListOffsetRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
@@ -115,7 +115,13 @@ public class ListOffsetRequest extends AbstractRequest {
responseData.put(entry.getKey(), partitionResponse);
}
- return new ListOffsetResponse(responseData);
+ switch(versionId) {
+ case 0:
+ return new ListOffsetResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
+ }
}
public int replicaId() {
@@ -126,6 +132,10 @@ public class ListOffsetRequest extends AbstractRequest {
return offsetData;
}
+ public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
+ return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
+ }
+
public static ListOffsetRequest parse(ByteBuffer buffer) {
return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 7e0ce15..f70e8da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -18,6 +18,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -47,18 +50,30 @@ public class MetadataRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<String, Errors>();
- for (String topic: topics) {
+ for (String topic : topics) {
topicErrors.put(topic, Errors.forException(e));
}
- return new MetadataResponse(topicErrors);
+
+ Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+ switch (versionId) {
+ case 0:
+ return new MetadataResponse(cluster, topicErrors);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
+ }
}
public List<String> topics() {
return topics;
}
+ public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
+ return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
+ }
+
public static MetadataRequest parse(ByteBuffer buffer) {
return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 44e2ce6..c8f2d08 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
public class MetadataResponse extends AbstractRequestResponse {
-
+
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
@@ -67,30 +67,15 @@ public class MetadataResponse extends AbstractRequestResponse {
private final Cluster cluster;
private final Map<String, Errors> errors;
- /* Constructor for error responses where most of the data, except error per topic, is irrelevant */
- public MetadataResponse(Map<String, Errors> topicErrors) {
- super(new Struct(CURRENT_SCHEMA));
-
- struct.set(BROKERS_KEY_NAME, new ArrayList<Struct>().toArray());
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Errors> topicError : topicErrors.entrySet()) {
- Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
- topicData.set(TOPIC_ERROR_CODE_KEY_NAME, topicError.getValue().code());
- topicData.set(TOPIC_KEY_NAME, topicError.getKey());
- topicData.set(PARTITION_METADATA_KEY_NAME, new ArrayList<Struct>().toArray());
- topicArray.add(topicData);
- }
- struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
-
- this.errors = topicErrors;
- this.cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
- }
-
- public MetadataResponse(Cluster cluster) {
+ /**
+ * Constructor for MetadataResponse where there are errors for some of the topics,
+ * error data take precedence over cluster information for particular topic
+ */
+ public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
super(new Struct(CURRENT_SCHEMA));
List<Struct> brokerArray = new ArrayList<Struct>();
- for (Node node: cluster.nodes()) {
+ for (Node node : cluster.nodes()) {
Struct broker = struct.instance(BROKERS_KEY_NAME);
broker.set(NODE_ID_KEY_NAME, node.id());
broker.set(HOST_KEY_NAME, node.host());
@@ -100,27 +85,33 @@ public class MetadataResponse extends AbstractRequestResponse {
struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
List<Struct> topicArray = new ArrayList<Struct>();
- for (String topic: cluster.topics()) {
+ for (String topic : cluster.topics()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
- topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+
topicData.set(TOPIC_KEY_NAME, topic);
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
- Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
- partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
- partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
- partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
- ArrayList<Integer> replicas = new ArrayList<Integer>();
- for (Node node: fetchPartitionData.replicas())
- replicas.add(node.id());
- partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
- ArrayList<Integer> isr = new ArrayList<Integer>();
- for (Node node: fetchPartitionData.inSyncReplicas())
- isr.add(node.id());
- partitionData.set(ISR_KEY_NAME, isr.toArray());
- partitionArray.add(partitionData);
+ if (errors.containsKey(topic)) {
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
+ } else {
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+ Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+ partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+ partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+ partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+ ArrayList<Integer> replicas = new ArrayList<Integer>();
+ for (Node node : fetchPartitionData.replicas())
+ replicas.add(node.id());
+ partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+ ArrayList<Integer> isr = new ArrayList<Integer>();
+ for (Node node : fetchPartitionData.inSyncReplicas())
+ isr.add(node.id());
+ partitionData.set(ISR_KEY_NAME, isr.toArray());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
}
- topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+
topicArray.add(topicData);
}
struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
@@ -183,4 +174,4 @@ public class MetadataResponse extends AbstractRequestResponse {
public static MetadataResponse parse(ByteBuffer buffer) {
return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8bf6cbb..d6e6386 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -217,12 +217,22 @@ public class OffsetCommitRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
responseData.put(entry.getKey(), Errors.forException(e).code());
}
- return new OffsetCommitResponse(responseData);
+
+ switch (versionId) {
+ // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
+ case 0:
+ case 1:
+ case 2:
+ return new OffsetCommitResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
+ }
}
public String groupId() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index deec1fa..b5e8a0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -88,16 +88,24 @@ public class OffsetFetchRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
for (TopicPartition partition: partitions) {
responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
- OffsetFetchResponse.NO_METADATA,
- Errors.forException(e).code()));
+ OffsetFetchResponse.NO_METADATA,
+ Errors.forException(e).code()));
}
- return new OffsetFetchResponse(responseData);
+ switch (versionId) {
+ // OffsetFetchResponseV0 == OffsetFetchResponseV1
+ case 0:
+ case 1:
+ return new OffsetFetchResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
+ }
}
public String groupId() {
@@ -108,6 +116,10 @@ public class OffsetFetchRequest extends AbstractRequest {
return partitions;
}
+ public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
+ return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
+ }
+
public static OffsetFetchRequest parse(ByteBuffer buffer) {
return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fabeae3..715504b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -90,19 +90,24 @@ public class ProduceRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(Throwable e) {
-
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
return null;
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
- for (Map.Entry<TopicPartition, ByteBuffer> entry: partitionRecords.entrySet()) {
+ for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
}
- return new ProduceResponse(responseMap);
+ switch (versionId) {
+ case 0:
+ return new ProduceResponse(responseMap);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
+ }
}
public short acks() {
@@ -117,6 +122,10 @@ public class ProduceRequest extends AbstractRequest {
return partitionRecords;
}
+ public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
+ return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
+ }
+
public static ProduceRequest parse(ByteBuffer buffer) {
return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e3cc196..8b2aca8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,31 +38,31 @@ public class RequestResponseTest {
createRequestHeader(),
createResponseHeader(),
createConsumerMetadataRequest(),
- createConsumerMetadataRequest().getErrorResponse(new UnknownServerException()),
+ createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
createConsumerMetadataResponse(),
createFetchRequest(),
- createFetchRequest().getErrorResponse(new UnknownServerException()),
+ createFetchRequest().getErrorResponse(0, new UnknownServerException()),
createFetchResponse(),
createHeartBeatRequest(),
- createHeartBeatRequest().getErrorResponse(new UnknownServerException()),
+ createHeartBeatRequest().getErrorResponse(0, new UnknownServerException()),
createHeartBeatResponse(),
createJoinGroupRequest(),
- createJoinGroupRequest().getErrorResponse(new UnknownServerException()),
+ createJoinGroupRequest().getErrorResponse(0, new UnknownServerException()),
createJoinGroupResponse(),
createListOffsetRequest(),
- createListOffsetRequest().getErrorResponse(new UnknownServerException()),
+ createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
createListOffsetResponse(),
createMetadataRequest(),
- createMetadataRequest().getErrorResponse(new UnknownServerException()),
+ createMetadataRequest().getErrorResponse(0, new UnknownServerException()),
createMetadataResponse(),
createOffsetCommitRequest(),
- createOffsetCommitRequest().getErrorResponse(new UnknownServerException()),
+ createOffsetCommitRequest().getErrorResponse(0, new UnknownServerException()),
createOffsetCommitResponse(),
createOffsetFetchRequest(),
- createOffsetFetchRequest().getErrorResponse(new UnknownServerException()),
+ createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
createOffsetFetchResponse(),
createProduceRequest(),
- createProduceRequest().getErrorResponse(new UnknownServerException()),
+ createProduceRequest().getErrorResponse(0, new UnknownServerException()),
createProduceResponse());
for (AbstractRequestResponse req: requestResponseList) {
@@ -145,7 +145,10 @@ public class RequestResponseTest {
Node[] isr = new Node[1];
isr[0] = node;
Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
- return new MetadataResponse(cluster);
+
+ Map<String, Errors> errors = new HashMap<String, Errors>();
+ errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);
+ return new MetadataResponse(cluster, errors);
}
private AbstractRequest createOffsetCommitRequest() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 357d8b9..2074128 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -66,7 +66,7 @@ object RequestChannel extends Logging {
null
val body: AbstractRequest =
if (requestObj == null)
- AbstractRequest.getRequest(header.apiKey, buffer)
+ AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
else
null
http://git-wip-us.apache.org/repos/asf/kafka/blob/54e54f08/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d63bc18..c7debe4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -74,7 +74,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if ( request.requestObj != null)
request.requestObj.handleError(e, requestChannel, request)
else {
- val response = request.body.getErrorResponse(e)
+ val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)
/* If request doesn't have a default error response, we just close the connection.