You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:40 UTC
[26/50] [abbrv] kafka git commit: KAFKA-3306: Change metadata response to include required additional fi…
KAFKA-3306: Change metadata response to include required additional fi…
…elds
- Adds boolean type to the protocol
- Allows protocol arrays to be null (optionally)
- Adds support to ask for no topics in the metadata request
- Adds new fields to the Metadata response protocol
- Adds server code to handle new fields
- Support no-topic metadata requests
- Track controller id in the metadata cache
- Check if a topic is considered internal
- Included rack information if present
- Include all replicas and ISRs, even if node is down
- Adds test code to test new functionality independent of the client
Author: Grant Henke <gr...@gmail.com>
Reviewers: Gwen Shapira, Ismael Juma, Ashish Singh
Closes #1095 from granthenke/metadata-changes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33d745e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33d745e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33d745e2
Branch: refs/heads/0.10.0
Commit: 33d745e2dcfa7a9cac90af5594903330ad774cd2
Parents: 5b375d7
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Apr 26 17:03:18 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 26 17:03:18 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 25 +--
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../clients/consumer/internals/Fetcher.java | 17 +-
.../main/java/org/apache/kafka/common/Node.java | 32 +++-
.../apache/kafka/common/protocol/Protocol.java | 44 ++++-
.../kafka/common/protocol/types/ArrayOf.java | 33 +++-
.../kafka/common/protocol/types/Struct.java | 34 ++--
.../kafka/common/protocol/types/Type.java | 43 ++++-
.../kafka/common/requests/MetadataRequest.java | 40 ++++-
.../kafka/common/requests/MetadataResponse.java | 101 ++++++++++-
.../clients/consumer/internals/FetcherTest.java | 11 +-
.../types/ProtocolSerializationTest.java | 13 +-
.../common/requests/RequestResponseTest.java | 21 ++-
.../src/main/scala/kafka/admin/AdminUtils.scala | 4 +-
.../main/scala/kafka/admin/TopicCommand.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 4 +-
core/src/main/scala/kafka/common/Topic.scala | 5 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 65 ++++---
.../main/scala/kafka/server/MetadataCache.scala | 39 +++--
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../unit/kafka/server/BaseRequestTest.scala | 106 ++++++++++++
.../unit/kafka/server/MetadataCacheTest.scala | 46 ++++-
.../unit/kafka/server/MetadataRequestTest.scala | 168 +++++++++++++++++++
23 files changed, 732 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index cc5dc6f..b134631 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -35,11 +35,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -53,7 +51,7 @@ public class NetworkClient implements KafkaClient {
/* the selector used to perform network i/o */
private final Selectable selector;
-
+
private final MetadataUpdater metadataUpdater;
private final Random randOffset;
@@ -78,7 +76,7 @@ public class NetworkClient implements KafkaClient {
/* max time in ms for the producer to wait for acknowledgement from server*/
private final int requestTimeoutMs;
-
+
private final Time time;
public NetworkClient(Selectable selector,
@@ -114,7 +112,7 @@ public class NetworkClient implements KafkaClient {
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
- int socketReceiveBuffer,
+ int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
@@ -370,7 +368,7 @@ public class NetworkClient implements KafkaClient {
found = node;
}
}
-
+
return found;
}
@@ -546,7 +544,7 @@ public class NetworkClient implements KafkaClient {
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
-
+
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
@@ -614,8 +612,7 @@ public class NetworkClient implements KafkaClient {
/**
* Create a metadata request for the given topics
*/
- private ClientRequest request(long now, String node, Set<String> topics) {
- MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics));
+ private ClientRequest request(long now, String node, MetadataRequest metadata) {
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
return new ClientRequest(now, true, send, null, true);
}
@@ -633,11 +630,15 @@ public class NetworkClient implements KafkaClient {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
- Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
this.metadataFetchInProgress = true;
- ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
+ MetadataRequest metadataRequest;
+ if (metadata.needMetadataForAllTopics())
+ metadataRequest = MetadataRequest.allTopics();
+ else
+ metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
+ ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
- doSend(metadataRequest, now);
+ doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d9b74e2..ad44d16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.SystemTime;
@@ -1190,7 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (parts != null)
return parts;
- Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs);
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topic)), requestTimeoutMs);
return topicMetadata.get(topic);
} finally {
release();
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4985275..f6d3387 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -183,25 +183,26 @@ public class Fetcher<K, V> {
* @return The map of topics with their partition information
*/
public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
- return getTopicMetadata(null, timeout);
+ return getTopicMetadata(MetadataRequest.allTopics(), timeout);
}
/**
* Get metadata for all topics present in Kafka cluster
*
- * @param topics The list of topics to fetch or null to fetch all
+ * @param request The MetadataRequest to send
* @param timeout time for which getting topic metadata is attempted
* @return The map of topics with their partition information
*/
- public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) {
- if (topics != null && topics.isEmpty())
+ public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest request, long timeout) {
+ // Save the round trip if no topics are requested.
+ if (!request.isAllTopics() && request.topics().isEmpty())
return Collections.emptyMap();
long start = time.milliseconds();
long remaining = timeout;
do {
- RequestFuture<ClientResponse> future = sendMetadataRequest(topics);
+ RequestFuture<ClientResponse> future = sendMetadataRequest(request);
client.poll(future, remaining);
if (future.failed() && !future.isRetriable())
@@ -266,14 +267,12 @@ public class Fetcher<K, V> {
* Send Metadata Request to least loaded node in Kafka cluster asynchronously
* @return A future that indicates result of sent metadata request
*/
- private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
- if (topics == null)
- topics = Collections.emptyList();
+ private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest request) {
final Node node = client.leastLoadedNode();
if (node == null)
return RequestFuture.noBrokersAvailable();
else
- return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
+ return client.send(node, ApiKeys.METADATA, request);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 6c3fd0b..f569ddd 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -23,13 +23,19 @@ public class Node {
private final String idString;
private final String host;
private final int port;
+ private final String rack;
public Node(int id, String host, int port) {
+ this(id, host, port, null);
+ }
+
+ public Node(int id, String host, int port, String rack) {
super();
this.id = id;
this.idString = Integer.toString(id);
this.host = host;
this.port = port;
+ this.rack = rack;
}
public static Node noNode() {
@@ -74,6 +80,20 @@ public class Node {
return port;
}
+ /**
+ * True if this node has a defined rack
+ */
+ public boolean hasRack() {
+ return rack != null;
+ }
+
+ /**
+ * The rack for this node
+ */
+ public String rack() {
+ return rack;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -81,6 +101,7 @@ public class Node {
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + id;
result = prime * result + port;
+ result = prime * result + ((rack == null) ? 0 : rack.hashCode());
return result;
}
@@ -102,12 +123,17 @@ public class Node {
return false;
if (port != other.port)
return false;
+ if (rack == null) {
+ if (other.rack != null)
+ return false;
+ } else if (!rack.equals(other.rack))
+ return false;
return true;
}
@Override
public String toString() {
- return host + ":" + port + " (id: " + idString + ")";
+ return host + ":" + port + " (id: " + idString + " rack: " + rack + ")";
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index bf76557..d322095 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
@@ -56,10 +57,13 @@ public class Protocol {
new ArrayOf(STRING),
"An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
- public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+ public static final Schema METADATA_REQUEST_V1 = new Schema(new Field("topics",
+ ArrayOf.nullable(STRING),
+ "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
+
+ public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
- new Field("port",
- INT32,
+ new Field("port", INT32,
"The port on which the broker accepts requests."));
public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
@@ -87,13 +91,34 @@ public class Protocol {
"Metadata for each partition of the topic."));
public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(BROKER),
+ new ArrayOf(METADATA_BROKER_V0),
"Host and port information for all brokers."),
new Field("topic_metadata",
new ArrayOf(TOPIC_METADATA_V0)));
- public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
- public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
+ public static final Schema METADATA_BROKER_V1 = new Schema(new Field("node_id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port", INT32,
+ "The port on which the broker accepts requests."),
+ new Field("rack", NULLABLE_STRING, "The rack of the broker."));
+
+ public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
+
+ public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
+ new Field("topic", STRING, "The name of the topic"),
+ new Field("is_internal", BOOLEAN,
+ "Indicates if the topic is considered a Kafka internal topic"),
+ new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
+ "Metadata for each partition of the topic."));
+
+ public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+ "Host and port information for all brokers."),
+ new Field("controller_id", INT32,
+ "The broker id of the controller broker."),
+ new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
+ public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1};
+ public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1};
/* Produce api */
@@ -496,9 +521,14 @@ public class Protocol {
STRING,
"The unique group id."));
+ public static final Schema GROUP_COORDINATOR_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port", INT32,
+ "The port on which the broker accepts requests."));
+
public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
new Field("coordinator",
- BROKER,
+ GROUP_COORDINATOR_BROKER_V0,
"Host and port information for the coordinator for a consumer group."));
public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index a08f876..207f108 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -24,13 +24,33 @@ import java.nio.ByteBuffer;
public class ArrayOf extends Type {
private final Type type;
+ private final boolean nullable;
public ArrayOf(Type type) {
+ this(type, false);
+ }
+
+ public static ArrayOf nullable(Type type) {
+ return new ArrayOf(type, true);
+ }
+
+ private ArrayOf(Type type, boolean nullable) {
this.type = type;
+ this.nullable = nullable;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return nullable;
}
@Override
public void write(ByteBuffer buffer, Object o) {
+ if (o == null) {
+ buffer.putInt(-1);
+ return;
+ }
+
Object[] objs = (Object[]) o;
int size = objs.length;
buffer.putInt(size);
@@ -41,8 +61,11 @@ public class ArrayOf extends Type {
@Override
public Object read(ByteBuffer buffer) {
int size = buffer.getInt();
- if (size < 0)
+ if (size < 0 && isNullable())
+ return null;
+ else if (size < 0)
throw new SchemaException("Array size " + size + " cannot be negative");
+
if (size > buffer.remaining())
throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
Object[] objs = new Object[size];
@@ -53,8 +76,11 @@ public class ArrayOf extends Type {
@Override
public int sizeOf(Object o) {
- Object[] objs = (Object[]) o;
int size = 4;
+ if (o == null)
+ return size;
+
+ Object[] objs = (Object[]) o;
for (int i = 0; i < objs.length; i++)
size += type.sizeOf(objs[i]);
return size;
@@ -72,6 +98,9 @@ public class ArrayOf extends Type {
@Override
public Object[] validate(Object item) {
try {
+ if (isNullable() && item == null)
+ return null;
+
Object[] array = (Object[]) item;
for (int i = 0; i < array.length; i++)
type.validate(array[i]);
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 79f0638..7eee09f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -41,7 +41,7 @@ public class Struct {
/**
* Return the value of the given pre-validated field, or if the value is missing return the default value.
- *
+ *
* @param field The field for which to get the default value
* @throws SchemaException if the field has no value and has no default.
*/
@@ -59,7 +59,7 @@ public class Struct {
/**
* Get the value for the field directly by the field index with no lookup needed (faster!)
- *
+ *
* @param field The field to look up
* @return The value for that field.
* @throws SchemaException if the field has no value and has no default.
@@ -71,7 +71,7 @@ public class Struct {
/**
* Get the record value for the field with the given name by doing a hash table lookup (slower!)
- *
+ *
* @param name The name of the field
* @return The value in the record
* @throws SchemaException If no such field exists
@@ -148,6 +148,14 @@ public class Struct {
return (String) get(name);
}
+ public Boolean getBoolean(Field field) {
+ return (Boolean) get(field);
+ }
+
+ public Boolean getBoolean(String name) {
+ return (Boolean) get(name);
+ }
+
public ByteBuffer getBytes(Field field) {
Object result = get(field);
if (result instanceof byte[])
@@ -164,7 +172,7 @@ public class Struct {
/**
* Set the given field to the specified value
- *
+ *
* @param field The field
* @param value The value
* @throws SchemaException If the validation of the field failed
@@ -177,7 +185,7 @@ public class Struct {
/**
* Set the field specified by the given name to the value
- *
+ *
* @param name The name of the field
* @param value The value to set
* @throws SchemaException If the field is not known
@@ -194,7 +202,7 @@ public class Struct {
* Create a struct for the schema of a container type (struct or array). Note that for array type, this method
* assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
* instantiated with this method.
- *
+ *
* @param field The field to create an instance of
* @return The struct
* @throws SchemaException If the given field is not a container type
@@ -213,7 +221,7 @@ public class Struct {
/**
* Create a struct instance for the given field which must be a container type (struct or array)
- *
+ *
* @param field The name of the field to create (field must be a schema type)
* @return The struct
* @throws SchemaException If the given field is not a container type
@@ -307,9 +315,11 @@ public class Struct {
for (int i = 0; i < this.values.length; i++) {
Field f = this.schema.get(i);
if (f.type() instanceof ArrayOf) {
- Object[] arrayObject = (Object []) this.get(f);
- for (Object arrayItem: arrayObject)
- result = prime * result + arrayItem.hashCode();
+ if (this.get(f) != null) {
+ Object[] arrayObject = (Object []) this.get(f);
+ for (Object arrayItem: arrayObject)
+ result = prime * result + arrayItem.hashCode();
+ }
} else {
Object field = this.get(f);
if (field != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 92c1f7c..43b4a37 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -59,6 +59,47 @@ public abstract class Type {
return false;
}
+ /**
+ * The Boolean type represents a boolean value in a byte by using
+ * the value of 0 to represent false, and 1 to represent true.
+ *
+ * If for some reason a value that is not 0 or 1 is read,
+ * then any non-zero value will return true.
+ */
+ public static final Type BOOLEAN = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ if ((Boolean) o)
+ buffer.put((byte) 1);
+ else
+ buffer.put((byte) 0);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ byte value = buffer.get();
+ return value != 0;
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 1;
+ }
+
+ @Override
+ public String toString() {
+ return "BOOLEAN";
+ }
+
+ @Override
+ public Boolean validate(Object item) {
+ if (item instanceof Boolean)
+ return (Boolean) item;
+ else
+ throw new SchemaException(item + " is not a Boolean.");
+ }
+ };
+
public static final Type INT8 = new Type() {
@Override
public void write(ByteBuffer buffer, Object o) {
@@ -196,7 +237,7 @@ public abstract class Type {
throw new SchemaException("String length " + length + " cannot be negative");
if (length > buffer.remaining())
throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-
+
byte[] bytes = new byte[length];
buffer.get(bytes);
return Utils.utf8(bytes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 92d8c6d..f0cb8fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -25,24 +25,41 @@ import java.util.Collections;
import java.util.List;
public class MetadataRequest extends AbstractRequest {
-
+
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
private static final String TOPICS_KEY_NAME = "topics";
+ private static final MetadataRequest ALL_TOPICS_REQUEST = new MetadataRequest((List<String>) null); // Unusual cast to work around constructor ambiguity
+
private final List<String> topics;
+ public static MetadataRequest allTopics() {
+ return ALL_TOPICS_REQUEST;
+ }
+
+ /**
+ * In v0 null is not allowed and and empty list indicates requesting all topics.
+ * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
+ */
public MetadataRequest(List<String> topics) {
super(new Struct(CURRENT_SCHEMA));
- struct.set(TOPICS_KEY_NAME, topics.toArray());
+ if (topics == null)
+ struct.set(TOPICS_KEY_NAME, null);
+ else
+ struct.set(TOPICS_KEY_NAME, topics.toArray());
this.topics = topics;
}
public MetadataRequest(Struct struct) {
super(struct);
Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
- topics = new ArrayList<>();
- for (Object topicObj: topicArray) {
- topics.add((String) topicObj);
+ if (topicArray != null) {
+ topics = new ArrayList<>();
+ for (Object topicObj: topicArray) {
+ topics.add((String) topicObj);
+ }
+ } else {
+ topics = null;
}
}
@@ -52,18 +69,25 @@ public class MetadataRequest extends AbstractRequest {
Errors error = Errors.forException(e);
List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
- for (String topic : topics)
- topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions));
+ if (topics != null) {
+ for (String topic : topics)
+ topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, false, partitions));
+ }
switch (versionId) {
case 0:
- return new MetadataResponse(Collections.<Node>emptyList(), topicMetadatas);
+ case 1:
+ return new MetadataResponse(Collections.<Node>emptyList(), MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
}
}
+ public boolean isAllTopics() {
+ return topics == null;
+ }
+
public List<String> topics() {
return topics;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 13e0d8f..09a5bee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -18,7 +18,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -32,7 +31,7 @@ import java.util.Set;
public class MetadataResponse extends AbstractRequestResponse {
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+ private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id);
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
@@ -40,6 +39,10 @@ public class MetadataResponse extends AbstractRequestResponse {
private static final String NODE_ID_KEY_NAME = "node_id";
private static final String HOST_KEY_NAME = "host";
private static final String PORT_KEY_NAME = "port";
+ private static final String RACK_KEY_NAME = "rack";
+
+ private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+ public static final int NO_CONTROLLER_ID = -1;
// topic level field names
private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
@@ -54,6 +57,7 @@ public class MetadataResponse extends AbstractRequestResponse {
*/
private static final String TOPIC_KEY_NAME = "topic";
+ private static final String IS_INTERNAL_KEY_NAME = "is_internal";
private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
// partition level field names
@@ -72,13 +76,24 @@ public class MetadataResponse extends AbstractRequestResponse {
private static final String ISR_KEY_NAME = "isr";
private final Collection<Node> brokers;
+ private final Node controller;
private final List<TopicMetadata> topicMetadata;
+ /**
+ * Constructor for the latest version
+ */
+ public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata) {
+ this(brokers, controllerId, topicMetadata, CURRENT_VERSION);
+ }
- public MetadataResponse(List<Node> brokers, List<TopicMetadata> topicMetadata) {
- super(new Struct(CURRENT_SCHEMA));
+ /**
+ * Constructor for a specific version
+ */
+ public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata, int version) {
+ super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version)));
this.brokers = brokers;
+ this.controller = getControllerNode(controllerId, brokers);
this.topicMetadata = topicMetadata;
List<Struct> brokerArray = new ArrayList<>();
@@ -87,15 +102,25 @@ public class MetadataResponse extends AbstractRequestResponse {
broker.set(NODE_ID_KEY_NAME, node.id());
broker.set(HOST_KEY_NAME, node.host());
broker.set(PORT_KEY_NAME, node.port());
+ // This field only exists in v1+
+ if (broker.hasField(RACK_KEY_NAME))
+ broker.set(RACK_KEY_NAME, node.rack());
brokerArray.add(broker);
}
struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+ // This field only exists in v1+
+ if (struct.hasField(CONTROLLER_ID_KEY_NAME))
+ struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+
List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size());
for (TopicMetadata metadata : topicMetadata) {
Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, metadata.topic);
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
+ // This field only exists in v1+
+ if (topicData.hasField(IS_INTERNAL_KEY_NAME))
+ topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal());
List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size());
for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
@@ -130,15 +155,28 @@ public class MetadataResponse extends AbstractRequestResponse {
int nodeId = broker.getInt(NODE_ID_KEY_NAME);
String host = broker.getString(HOST_KEY_NAME);
int port = broker.getInt(PORT_KEY_NAME);
- brokers.put(nodeId, new Node(nodeId, host, port));
+ // This field only exists in v1+
+ // When we can't know if a rack exists in a v0 response we default to null
+ String rack = broker.hasField(RACK_KEY_NAME) ? broker.getString(RACK_KEY_NAME) : null;
+ brokers.put(nodeId, new Node(nodeId, host, port, rack));
}
+ // This field only exists in v1+
+ // When we can't know the controller id in a v0 response we default to NO_CONTROLLER_ID
+ int controllerId = NO_CONTROLLER_ID;
+ if (struct.hasField(CONTROLLER_ID_KEY_NAME))
+ controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+
List<TopicMetadata> topicMetadata = new ArrayList<>();
Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME);
for (int i = 0; i < topicInfos.length; i++) {
Struct topicInfo = (Struct) topicInfos[i];
Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME));
String topic = topicInfo.getString(TOPIC_KEY_NAME);
+ // This field only exists in v1+
+ // When we can't know if a topic is internal or not in a v0 response we default to false
+ boolean isInternal = topicInfo.hasField(IS_INTERNAL_KEY_NAME) ? topicInfo.getBoolean(IS_INTERNAL_KEY_NAME) : false;
+
List<PartitionMetadata> partitionMetadata = new ArrayList<>();
Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
@@ -149,23 +187,41 @@ public class MetadataResponse extends AbstractRequestResponse {
int leader = partitionInfo.getInt(LEADER_KEY_NAME);
Node leaderNode = leader == -1 ? null : brokers.get(leader);
Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+
List<Node> replicaNodes = new ArrayList<>(replicas.length);
for (Object replicaNodeId : replicas)
- replicaNodes.add(brokers.get(replicaNodeId));
+ if (brokers.containsKey(replicaNodeId))
+ replicaNodes.add(brokers.get(replicaNodeId));
+ else
+ replicaNodes.add(new Node((int) replicaNodeId, "", -1));
+
Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
List<Node> isrNodes = new ArrayList<>(isr.length);
for (Object isrNode : isr)
- isrNodes.add(brokers.get(isrNode));
+ if (brokers.containsKey(isrNode))
+ isrNodes.add(brokers.get(isrNode));
+ else
+ isrNodes.add(new Node((int) isrNode, "", -1));
+
partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes));
}
- topicMetadata.add(new TopicMetadata(topicError, topic, partitionMetadata));
+ topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata));
}
this.brokers = brokers.values();
+ this.controller = getControllerNode(controllerId, brokers.values());
this.topicMetadata = topicMetadata;
}
+ private Node getControllerNode(int controllerId, Collection<Node> brokers) {
+ for (Node broker : brokers) {
+ if (broker.id() == controllerId)
+ return broker;
+ }
+ return null;
+ }
+
/**
* Get a map of the topics which had metadata errors
* @return the map
@@ -211,20 +267,43 @@ public class MetadataResponse extends AbstractRequestResponse {
return brokers;
}
+ /**
+ * Get all topic metadata returned in the metadata response
+ * @return the topicMetadata
+ */
+ public Collection<TopicMetadata> topicMetadata() {
+ return topicMetadata;
+ }
+
+ /**
+ * The controller node returned in metadata response
+ * @return the controller node or null if it doesn't exist
+ */
+ public Node controller() {
+ return controller;
+ }
+
public static MetadataResponse parse(ByteBuffer buffer) {
- return new MetadataResponse(CURRENT_SCHEMA.read(buffer));
+ return parse(buffer, CURRENT_VERSION);
+ }
+
+ public static MetadataResponse parse(ByteBuffer buffer, int version) {
+ return new MetadataResponse(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version).read(buffer));
}
public static class TopicMetadata {
private final Errors error;
private final String topic;
+ private final boolean isInternal;
private final List<PartitionMetadata> partitionMetadata;
public TopicMetadata(Errors error,
String topic,
+ boolean isInternal,
List<PartitionMetadata> partitionMetadata) {
this.error = error;
this.topic = topic;
+ this.isInternal = isInternal;
this.partitionMetadata = partitionMetadata;
}
@@ -236,6 +315,10 @@ public class MetadataResponse extends AbstractRequestResponse {
return topic;
}
+ public boolean isInternal() {
+ return isInternal;
+ }
+
public List<PartitionMetadata> partitionMetadata() {
return partitionMetadata;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 9002e81..49bff10 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
@@ -478,14 +479,14 @@ public class FetcherTest {
@Test(expected = InvalidTopicException.class)
public void testGetTopicMetadataInvalidTopic() {
client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
- fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
}
@Test
public void testGetTopicMetadataUnknownTopic() {
client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
- Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
assertNull(topicMetadata.get(topicName));
}
@@ -494,7 +495,7 @@ public class FetcherTest {
client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
- Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
assertTrue(topicMetadata.containsKey(topicName));
}
@@ -570,8 +571,8 @@ public class FetcherTest {
}
}
- MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, partitionsMetadata);
- return new MetadataResponse(cluster.nodes(), Arrays.asList(topicMetadata));
+ MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
+ return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 5c34277..e91b2fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -33,7 +33,8 @@ public class ProtocolSerializationTest {
@Before
public void setup() {
- this.schema = new Schema(new Field("int8", Type.INT8),
+ this.schema = new Schema(new Field("boolean", Type.BOOLEAN),
+ new Field("int8", Type.INT8),
new Field("int16", Type.INT16),
new Field("int32", Type.INT32),
new Field("int64", Type.INT64),
@@ -42,8 +43,10 @@ public class ProtocolSerializationTest {
new Field("bytes", Type.BYTES),
new Field("nullable_bytes", Type.NULLABLE_BYTES),
new Field("array", new ArrayOf(Type.INT32)),
+ new Field("null_array", ArrayOf.nullable(Type.INT32)),
new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32)))));
- this.struct = new Struct(this.schema).set("int8", (byte) 1)
+ this.struct = new Struct(this.schema).set("boolean", true)
+ .set("int8", (byte) 1)
.set("int16", (short) 1)
.set("int32", 1)
.set("int64", 1L)
@@ -51,12 +54,15 @@ public class ProtocolSerializationTest {
.set("nullable_string", null)
.set("bytes", ByteBuffer.wrap("1".getBytes()))
.set("nullable_bytes", null)
- .set("array", new Object[] {1});
+ .set("array", new Object[] {1})
+ .set("null_array", null);
this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3}));
}
@Test
public void testSimple() {
+ check(Type.BOOLEAN, false);
+ check(Type.BOOLEAN, true);
check(Type.INT8, (byte) -111);
check(Type.INT16, (short) -11111);
check(Type.INT32, -11111111);
@@ -75,6 +81,7 @@ public class ProtocolSerializationTest {
check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
check(new ArrayOf(Type.STRING), new Object[] {});
check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
+ check(ArrayOf.nullable(Type.STRING), null);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 92f3101..0018f53 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -70,9 +70,10 @@ public class RequestResponseTest {
createListOffsetRequest(),
createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
createListOffsetResponse(),
- createMetadataRequest(),
- createMetadataRequest().getErrorResponse(0, new UnknownServerException()),
- createMetadataResponse(),
+ MetadataRequest.allTopics(),
+ createMetadataRequest(Arrays.asList("topic1")),
+ createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new UnknownServerException()),
+ createMetadataResponse(1),
createOffsetCommitRequest(2),
createOffsetCommitRequest(2).getErrorResponse(2, new UnknownServerException()),
createOffsetCommitResponse(),
@@ -100,6 +101,8 @@ public class RequestResponseTest {
for (AbstractRequestResponse req : requestResponseList)
checkSerialization(req, null);
+ createMetadataResponse(0);
+ createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new UnknownServerException());
checkSerialization(createFetchRequest().getErrorResponse(0, new UnknownServerException()), 0);
checkSerialization(createOffsetCommitRequest(0), 0);
checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
@@ -281,22 +284,22 @@ public class RequestResponseTest {
return new ListOffsetResponse(responseData);
}
- private AbstractRequest createMetadataRequest() {
- return new MetadataRequest(Arrays.asList("topic1"));
+ private AbstractRequest createMetadataRequest(List<String> topics) {
+ return new MetadataRequest(topics);
}
- private AbstractRequestResponse createMetadataResponse() {
+ private AbstractRequestResponse createMetadataResponse(int version) {
Node node = new Node(1, "host1", 1001);
List<Node> replicas = Arrays.asList(node);
List<Node> isr = Arrays.asList(node);
List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
- allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic1",
+ allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
- allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2",
+ allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
Collections.<MetadataResponse.PartitionMetadata>emptyList()));
- return new MetadataResponse(Arrays.asList(node), allTopicMetadata);
+ return new MetadataResponse(Arrays.asList(node), MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
}
private AbstractRequest createOffsetCommitRequest(int version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 24174be..a8a282e 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -605,10 +605,10 @@ object AdminUtils extends Logging {
new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
}
}
- new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava)
+ new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava)
} else {
// topic doesn't exist, send appropriate error code
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
+ new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList())
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 232db4a..9f1014f 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -170,7 +170,7 @@ object TopicCommand extends Logging {
}
topics.foreach { topic =>
try {
- if (TopicConstants.INTERNAL_TOPICS.contains(topic)) {
+ if (Topic.isInternal(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 77b85e0..61290c1 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -123,7 +123,7 @@ case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]
def getNode(protocolType: SecurityProtocol): Node = {
val endpoint = endPoints.getOrElse(protocolType,
throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
- new Node(id, endpoint.host, endpoint.port)
+ new Node(id, endpoint.host, endpoint.port, rack.orNull)
}
def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 6067712..054c5eb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,7 +18,7 @@
package kafka.common
import util.matching.Regex
-import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants.INTERNAL_TOPICS
object Topic {
val legalChars = "[a-zA-Z0-9\\._\\-]"
@@ -62,4 +62,7 @@ object Topic {
topicA.replace('.', '_') == topicB.replace('.', '_')
}
+ def isInternal(topic: String): Boolean =
+ INTERNAL_TOPICS.contains(topic)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9afefa5..406b1bd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,6 +24,7 @@ import java.util.Properties
import kafka.admin.{RackAwareMode, AdminUtils}
import kafka.api._
import kafka.cluster.Partition
+import kafka.common
import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
@@ -631,12 +632,15 @@ class KafkaApis(val requestChannel: RequestChannel,
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+ new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+ java.util.Collections.emptyList())
} catch {
case e: TopicExistsException => // let it go, possibly another broker created this topic
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+ new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+ java.util.Collections.emptyList())
case itex: InvalidTopicException =>
- new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList())
+ new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
+ java.util.Collections.emptyList())
}
}
@@ -656,8 +660,8 @@ class KafkaApis(val requestChannel: RequestChannel,
topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
}
- private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
- val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
+ private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+ val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol, errorUnavailableEndpoints)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
@@ -668,7 +672,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
+ new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+ java.util.Collections.emptyList())
}
}
topicResponses ++ responsesForNonExistentTopics
@@ -680,16 +685,24 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.body.asInstanceOf[MetadataRequest]
+ val requestVersion = request.header.apiVersion()
- val topics = metadataRequest.topics.asScala.toSet
- var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) {
- //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
- val authorized = metadataCache.getAllTopics()
- .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
- (authorized, mutable.Set[String]())
- } else {
+ val topics =
+ // Handle old metadata request logic. Version 0 has no way to specify "no topics".
+ if (requestVersion == 0) {
+ if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)
+ metadataCache.getAllTopics()
+ else
+ metadataRequest.topics.asScala.toSet
+ } else {
+ if (metadataRequest.isAllTopics)
+ metadataCache.getAllTopics()
+ else
+ metadataRequest.topics.asScala.toSet
+ }
+
+ var (authorizedTopics, unauthorizedTopics) =
topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
- }
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
@@ -704,22 +717,32 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList()))
+ new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
+ java.util.Collections.emptyList()))
+
+ // In version 0, we returned an error when brokers with replicas were unavailable,
+ // while in higher versions we simply don't include the broker in the returned broker list
+ val errorUnavailableEndpoints = requestVersion == 0
+ val topicMetadata =
+ if (authorizedTopics.isEmpty)
+ Seq.empty[MetadataResponse.TopicMetadata]
+ else
+ getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)
- val topicMetadata = if (authorizedTopics.isEmpty)
- Seq.empty[MetadataResponse.TopicMetadata]
- else
- getTopicMetadata(authorizedTopics, request.securityProtocol)
+ val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
val brokers = metadataCache.getAliveBrokers
- trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
+ trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
val responseHeader = new ResponseHeader(request.header.correlationId)
+
val responseBody = new MetadataResponse(
brokers.map(_.getNode(request.securityProtocol)).asJava,
- (topicMetadata ++ unauthorizedTopicMetadata).asJava
+ metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+ completeTopicMetadata.asJava,
+ requestVersion
)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 06fae42..b387f2e 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -24,11 +24,11 @@ import scala.collection.{Seq, Set, mutable}
import scala.collection.JavaConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
-import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
+import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition}
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.Node
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
@@ -40,16 +40,24 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
private[server] class MetadataCache(brokerId: Int) extends Logging {
private val stateChangeLogger = KafkaController.stateChangeLogger
private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
+ private var controllerId: Option[Int] = None
private val aliveBrokers = mutable.Map[Int, Broker]()
private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
- private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = {
+ // This method is the main hotspot when it comes to the performance of metadata requests,
+ // we should be careful about adding additional logic here.
+ // filterUnavailableEndpoints exists to support v0 MetadataResponses
+ private def getEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol, filterUnavailableEndpoints: Boolean): Seq[Node] = {
val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
- getAliveEndpoint(brokerId, protocol).foreach(result +=)
+ val endpoint = getAliveEndpoint(brokerId, protocol) match {
+ case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None
+ case Some(node) => Some(node)
+ }
+ endpoint.foreach(result +=)
}
result
}
@@ -60,7 +68,8 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support security protocol `$protocol`"))
}
- private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+ // errorUnavailableEndpoints exists to support v0 MetadataResponses
+ private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
@@ -69,7 +78,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol)
val replicas = partitionState.allReplicas
- val replicaInfo = getAliveEndpoints(replicas, protocol)
+ val replicaInfo = getEndpoints(replicas, protocol, errorUnavailableEndpoints)
maybeLeader match {
case None =>
@@ -79,7 +88,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
case Some(leader) =>
val isr = leaderAndIsr.isr
- val isrInfo = getAliveEndpoints(isr, protocol)
+ val isrInfo = getEndpoints(isr, protocol, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
@@ -101,12 +110,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
- def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
+ // errorUnavailableEndpoints exists to support v0 MetadataResponses
+ def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
inReadLock(partitionMetadataLock) {
- val topicsRequested = if (topics.isEmpty) cache.keySet else topics
- topicsRequested.toSeq.flatMap { topic =>
- getPartitionMetadata(topic, protocol).map { partitionMetadata =>
- new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toBuffer.asJava)
+ topics.toSeq.flatMap { topic =>
+ getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { partitionMetadata =>
+ new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
}
}
}
@@ -151,8 +160,14 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
+ def getControllerId: Option[Int] = controllerId
+
def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) {
inWriteLock(partitionMetadataLock) {
+ controllerId = updateMetadataRequest.controllerId match {
+ case id if id < 0 => None
+ case id => Some(id)
+ }
aliveNodes.clear()
aliveBrokers.clear()
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9bbd29e..888912b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,6 +28,7 @@ import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import org.I0Itec.zkclient.IZkChildListener
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException,
InvalidTimestampException}
@@ -39,7 +40,6 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time => JTime}
import scala.collection._
import scala.collection.JavaConverters._
-import org.apache.kafka.common.internals.TopicConstants
/*
* Result metadata of a log append operation on the log
@@ -394,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
- if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) {
+ if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))))
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
new file mode 100644
index 0000000..3d05c1d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.network.SocketServer
+import kafka.utils._
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader}
+import org.junit.Before
+
+abstract class BaseRequestTest extends KafkaServerTestHarness {
+ val numBrokers = 3
+ private var correlationId = 0
+
+ // Override properties by mutating the passed Properties object
+ def propertyOverrides(properties: Properties): Unit
+
+ def generateConfigs() = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
+ props.foreach(propertyOverrides)
+ props.map(KafkaConfig.fromProps)
+ }
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
+ }
+
+ def socketServer = {
+ servers.find { server =>
+ val state = server.brokerState.currentState
+ state != NotRunning.state && state != BrokerShuttingDown.state
+ }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
+ }
+
+ private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
+ new Socket("localhost", s.boundPort(protocol))
+ }
+
+ private def sendRequest(socket: Socket, request: Array[Byte]) {
+ val outgoing = new DataOutputStream(socket.getOutputStream)
+ outgoing.writeInt(request.length)
+ outgoing.write(request)
+ outgoing.flush()
+ }
+
+ private def receiveResponse(socket: Socket): Array[Byte] = {
+ val incoming = new DataInputStream(socket.getInputStream)
+ val len = incoming.readInt()
+ val response = new Array[Byte](len)
+ incoming.readFully(response)
+ response
+ }
+
+ private def requestAndReceive(request: Array[Byte]): Array[Byte] = {
+ val plainSocket = connect()
+ try {
+ sendRequest(plainSocket, request)
+ receiveResponse(plainSocket)
+ } finally {
+ plainSocket.close()
+ }
+ }
+
+ /**
+ * Serializes and send the request to the given api. A ByteBuffer containing the response is returned.
+ */
+ def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = {
+ correlationId += 1
+ val serializedBytes = {
+ val header = new RequestHeader(apiKey.id, version, "", correlationId)
+ val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
+ header.writeTo(byteBuffer)
+ request.writeTo(byteBuffer)
+ byteBuffer.array()
+ }
+
+ val response = requestAndReceive(serializedBytes)
+
+ val responseBuffer = ByteBuffer.wrap(response)
+ ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move the buffer forward
+ responseBuffer
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 017faea..770513c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -158,7 +158,8 @@ class MetadataCacheTest {
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
- val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
+ // Validate errorUnavailableEndpoints = false
+ val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
@@ -169,9 +170,25 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
- assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
+ assertEquals(Errors.NONE, partitionMetadata.error)
+ assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+
+ // Validate errorUnavailableEndpoints = true
+ val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+ assertEquals(1, topicMetadatasWithError.size)
+
+ val topicMetadataWithError = topicMetadatasWithError.head
+ assertEquals(Errors.NONE, topicMetadataWithError.error)
+
+ val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+ assertEquals(1, partitionMetadatasWithError.size)
+
+ val partitionMetadataWithError = partitionMetadatasWithError.get(0)
+ assertEquals(0, partitionMetadataWithError.partition)
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
+ assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
+ assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
}
@Test
@@ -197,7 +214,8 @@ class MetadataCacheTest {
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
- val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
+ // Validate errorUnavailableEndpoints = false
+ val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
@@ -208,9 +226,25 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
+ assertEquals(Errors.NONE, partitionMetadata.error)
assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
- assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+ assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+
+ // Validate errorUnavailableEndpoints = true
+ val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+ assertEquals(1, topicMetadatasWithError.size)
+
+ val topicMetadataWithError = topicMetadatasWithError.head
+ assertEquals(Errors.NONE, topicMetadataWithError.error)
+
+ val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+ assertEquals(1, partitionMetadatasWithError.size)
+
+ val partitionMetadataWithError = partitionMetadatasWithError.get(0)
+ assertEquals(0, partitionMetadataWithError.partition)
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
+ assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
+ assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
}
@Test