You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/27 02:03:25 UTC

[1/2] kafka git commit: KAFKA-3306: Change metadata response to include required additional fi…

Repository: kafka
Updated Branches:
  refs/heads/trunk 5b375d7bf -> 33d745e2d


http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
new file mode 100644
index 0000000..3d4b40c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -0,0 +1,168 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class MetadataRequestTest extends BaseRequestTest {
+
+  override def propertyOverrides(properties: Properties) {
+    properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
+  }
+
+  @Test
+  def testControllerId() {
+    val controllerServer = servers.find(_.kafkaController.isActive()).get
+    val controllerId = controllerServer.config.brokerId
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+
+    assertEquals("Controller id should match the active controller",
+      controllerId, metadataResponse.controller.id)
+
+    // Fail over the controller
+    controllerServer.shutdown()
+    controllerServer.startup()
+
+    val controllerServer2 = servers.find(_.kafkaController.isActive()).get
+    val controllerId2 = controllerServer2.config.brokerId
+    assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
+    TestUtils.waitUntilTrue(() => {
+      val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+      controllerServer2.apis.brokerId == metadataResponse2.controller.id
+    }, "Controller id should match the active controller after failover", 5000)
+  }
+
+  @Test
+  def testRack() {
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    // Validate rack matches what's set in generateConfigs() above
+    metadataResponse.brokers.asScala.foreach { broker =>
+      assertEquals("Rack information should match config", s"rack/${broker.id}", broker.rack)
+    }
+  }
+
+  @Test
+  def testIsInternal() {
+    val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+    val notInternalTopic = "notInternal"
+    // create the topics
+    TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
+    TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers)
+
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
+
+    val topicMetadata = metadataResponse.topicMetadata.asScala
+    val internalTopicMetadata = topicMetadata.find(_.topic == internalTopic).get
+    val notInternalTopicMetadata = topicMetadata.find(_.topic == notInternalTopic).get
+
+    assertTrue("internalTopic should show isInternal", internalTopicMetadata.isInternal)
+    assertFalse("notInternalTopic topic not should show isInternal", notInternalTopicMetadata.isInternal)
+  }
+
+  @Test
+  def testNoTopicsRequest() {
+    // create some topics
+    TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
+    TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
+
+    // v0, Doesn't support a "no topics" request
+    // v1, Empty list represents "no topics"
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 1)
+    assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
+    assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
+  }
+
+  @Test
+  def testAllTopicsRequest() {
+    // create some topics
+    TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
+    TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
+
+    // v0, Empty list represents all topics
+    val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 0)
+    assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
+    assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
+
+    // v1, Null represents all topics
+    val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    assertTrue("V1 Response should have no errors", metadataResponseV1.errors.isEmpty)
+    assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size())
+  }
+
+  @Test
+  def testReplicaDownResponse() {
+    val replicaDownTopic = "replicaDown"
+    val replicaCount = 3
+
+    // create a topic with 3 replicas
+    TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
+
+    // Kill a replica node that is not the leader
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
+    val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    val downNode = servers.find { server =>
+      val serverId = server.apis.brokerId
+      val leaderId = partitionMetadata.leader.id
+      val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+      serverId != leaderId && replicaIds.contains(serverId)
+    }.get
+    downNode.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
+      val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
+      val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
+      replica.host == "" & replica.port == -1
+    }, "Replica was not found down", 5000)
+
+    // Validate version 0 still filters unavailable replicas and contains error
+    val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 0)
+    val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
+    assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
+    assertTrue("Response should have one topic", v0MetadataResponse.topicMetadata.size == 1)
+    val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertTrue("PartitionMetadata should have an error", v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
+    assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
+
+    // Validate version 1 returns unavailable replicas with no error
+    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
+    val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
+    assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))
+    assertEquals("Response should have one topic", 1, v1MetadataResponse.topicMetadata.size)
+    val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertEquals("PartitionMetadata should have no errors", Errors.NONE, v1PartitionMetadata.error)
+    assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
+  }
+
+  private def sendMetadataRequest(request: MetadataRequest, version: Short): MetadataResponse = {
+    val response = send(request, ApiKeys.METADATA, version)
+    MetadataResponse.parse(response, version)
+  }
+}


[2/2] kafka git commit: KAFKA-3306: Change metadata response to include required additional fi…

Posted by gw...@apache.org.
KAFKA-3306: Change metadata response to include required additional fi…

…elds

- Adds boolean type to the protocol
- Allows protocol arrays to be null (optionally)
- Adds support to ask for no topics in the metadata request
- Adds new fields to the Metadata response protocol
- Adds server code to handle new fields
   - Support no-topic metadata requests
   - Track controller id in the metadata cache
   - Check if a topic is considered internal
   - Included rack information if present
   - Include all replicas and ISRs, even if node is down
- Adds test code to test new functionality independent of the client

Author: Grant Henke <gr...@gmail.com>

Reviewers: Gwen Shapira, Ismael Juma, Ashish Singh

Closes #1095 from granthenke/metadata-changes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33d745e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33d745e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33d745e2

Branch: refs/heads/trunk
Commit: 33d745e2dcfa7a9cac90af5594903330ad774cd2
Parents: 5b375d7
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Apr 26 17:03:18 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 26 17:03:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  25 +--
 .../kafka/clients/consumer/KafkaConsumer.java   |   3 +-
 .../clients/consumer/internals/Fetcher.java     |  17 +-
 .../main/java/org/apache/kafka/common/Node.java |  32 +++-
 .../apache/kafka/common/protocol/Protocol.java  |  44 ++++-
 .../kafka/common/protocol/types/ArrayOf.java    |  33 +++-
 .../kafka/common/protocol/types/Struct.java     |  34 ++--
 .../kafka/common/protocol/types/Type.java       |  43 ++++-
 .../kafka/common/requests/MetadataRequest.java  |  40 ++++-
 .../kafka/common/requests/MetadataResponse.java | 101 ++++++++++-
 .../clients/consumer/internals/FetcherTest.java |  11 +-
 .../types/ProtocolSerializationTest.java        |  13 +-
 .../common/requests/RequestResponseTest.java    |  21 ++-
 .../src/main/scala/kafka/admin/AdminUtils.scala |   4 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   2 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |   4 +-
 core/src/main/scala/kafka/common/Topic.scala    |   5 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  65 ++++---
 .../main/scala/kafka/server/MetadataCache.scala |  39 +++--
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../unit/kafka/server/BaseRequestTest.scala     | 106 ++++++++++++
 .../unit/kafka/server/MetadataCacheTest.scala   |  46 ++++-
 .../unit/kafka/server/MetadataRequestTest.scala | 168 +++++++++++++++++++
 23 files changed, 732 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index cc5dc6f..b134631 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -35,11 +35,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -53,7 +51,7 @@ public class NetworkClient implements KafkaClient {
 
     /* the selector used to perform network i/o */
     private final Selectable selector;
-    
+
     private final MetadataUpdater metadataUpdater;
 
     private final Random randOffset;
@@ -78,7 +76,7 @@ public class NetworkClient implements KafkaClient {
 
     /* max time in ms for the producer to wait for acknowledgement from server*/
     private final int requestTimeoutMs;
-    
+
     private final Time time;
 
     public NetworkClient(Selectable selector,
@@ -114,7 +112,7 @@ public class NetworkClient implements KafkaClient {
                           int maxInFlightRequestsPerConnection,
                           long reconnectBackoffMs,
                           int socketSendBuffer,
-                          int socketReceiveBuffer, 
+                          int socketReceiveBuffer,
                           int requestTimeoutMs,
                           Time time) {
 
@@ -370,7 +368,7 @@ public class NetworkClient implements KafkaClient {
                 found = node;
             }
         }
-        
+
         return found;
     }
 
@@ -546,7 +544,7 @@ public class NetworkClient implements KafkaClient {
             // if there is no node available to connect, back off refreshing metadata
             long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                     waitForMetadataFetch);
- 
+
             if (metadataTimeout == 0) {
                 // Beware that the behavior of this method and the computation of timeouts for poll() are
                 // highly dependent on the behavior of leastLoadedNode.
@@ -614,8 +612,7 @@ public class NetworkClient implements KafkaClient {
         /**
          * Create a metadata request for the given topics
          */
-        private ClientRequest request(long now, String node, Set<String> topics) {
-            MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics));
+        private ClientRequest request(long now, String node, MetadataRequest metadata) {
             RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
             return new ClientRequest(now, true, send, null, true);
         }
@@ -633,11 +630,15 @@ public class NetworkClient implements KafkaClient {
             String nodeConnectionId = node.idString();
 
             if (canSendRequest(nodeConnectionId)) {
-                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
                 this.metadataFetchInProgress = true;
-                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
+                MetadataRequest metadataRequest;
+                if (metadata.needMetadataForAllTopics())
+                    metadataRequest = MetadataRequest.allTopics();
+                else
+                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
+                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-                doSend(metadataRequest, now);
+                doSend(clientRequest, now);
             } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                 // we don't have a connection to this node right now, make one
                 log.debug("Initialize connection to node {} for sending metadata request", node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d9b74e2..ad44d16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.SystemTime;
@@ -1190,7 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (parts != null)
                 return parts;
 
-            Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs);
+            Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topic)), requestTimeoutMs);
             return topicMetadata.get(topic);
         } finally {
             release();

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4985275..f6d3387 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -183,25 +183,26 @@ public class Fetcher<K, V> {
      * @return The map of topics with their partition information
      */
     public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
-        return getTopicMetadata(null, timeout);
+        return getTopicMetadata(MetadataRequest.allTopics(), timeout);
     }
 
     /**
      * Get metadata for all topics present in Kafka cluster
      *
-     * @param topics The list of topics to fetch or null to fetch all
+     * @param request The MetadataRequest to send
      * @param timeout time for which getting topic metadata is attempted
      * @return The map of topics with their partition information
      */
-    public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) {
-        if (topics != null && topics.isEmpty())
+    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest request, long timeout) {
+        // Save the round trip if no topics are requested.
+        if (!request.isAllTopics() && request.topics().isEmpty())
             return Collections.emptyMap();
 
         long start = time.milliseconds();
         long remaining = timeout;
 
         do {
-            RequestFuture<ClientResponse> future = sendMetadataRequest(topics);
+            RequestFuture<ClientResponse> future = sendMetadataRequest(request);
             client.poll(future, remaining);
 
             if (future.failed() && !future.isRetriable())
@@ -266,14 +267,12 @@ public class Fetcher<K, V> {
      * Send Metadata Request to least loaded node in Kafka cluster asynchronously
      * @return A future that indicates result of sent metadata request
      */
-    private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
-        if (topics == null)
-            topics = Collections.emptyList();
+    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest request) {
         final Node node = client.leastLoadedNode();
         if (node == null)
             return RequestFuture.noBrokersAvailable();
         else
-            return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
+            return client.send(node, ApiKeys.METADATA, request);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 6c3fd0b..f569ddd 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -23,13 +23,19 @@ public class Node {
     private final String idString;
     private final String host;
     private final int port;
+    private final String rack;
 
     public Node(int id, String host, int port) {
+        this(id, host, port, null);
+    }
+
+    public Node(int id, String host, int port, String rack) {
         super();
         this.id = id;
         this.idString = Integer.toString(id);
         this.host = host;
         this.port = port;
+        this.rack = rack;
     }
 
     public static Node noNode() {
@@ -74,6 +80,20 @@ public class Node {
         return port;
     }
 
+    /**
+     * True if this node has a defined rack
+     */
+    public boolean hasRack() {
+        return rack != null;
+    }
+
+    /**
+     * The rack for this node
+     */
+    public String rack() {
+        return rack;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -81,6 +101,7 @@ public class Node {
         result = prime * result + ((host == null) ? 0 : host.hashCode());
         result = prime * result + id;
         result = prime * result + port;
+        result = prime * result + ((rack == null) ? 0 : rack.hashCode());
         return result;
     }
 
@@ -102,12 +123,17 @@ public class Node {
             return false;
         if (port != other.port)
             return false;
+        if (rack == null) {
+            if (other.rack != null)
+                return false;
+        } else if (!rack.equals(other.rack))
+            return false;
         return true;
     }
 
     @Override
     public String toString() {
-        return host + ":" + port + " (id: " + idString + ")";
+        return host + ":" + port + " (id: " + idString + " rack: " + rack + ")";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index bf76557..d322095 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
@@ -56,10 +57,13 @@ public class Protocol {
                                                                           new ArrayOf(STRING),
                                                                           "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
 
-    public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+    public static final Schema METADATA_REQUEST_V1 = new Schema(new Field("topics",
+                                                                          ArrayOf.nullable(STRING),
+                                                                          "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
+
+    public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
-                                                   new Field("port",
-                                                             INT32,
+                                                   new Field("port", INT32,
                                                              "The port on which the broker accepts requests."));
 
     public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
@@ -87,13 +91,34 @@ public class Protocol {
                                                                         "Metadata for each partition of the topic."));
 
     public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
-                                                                           new ArrayOf(BROKER),
+                                                                           new ArrayOf(METADATA_BROKER_V0),
                                                                            "Host and port information for all brokers."),
                                                                  new Field("topic_metadata",
                                                                            new ArrayOf(TOPIC_METADATA_V0)));
 
-    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
+    public static final Schema METADATA_BROKER_V1 = new Schema(new Field("node_id", INT32, "The broker id."),
+                                                      new Field("host", STRING, "The hostname of the broker."),
+                                                      new Field("port", INT32,
+                                                        "The port on which the broker accepts requests."),
+                                                      new Field("rack", NULLABLE_STRING, "The rack of the broker."));
+
+    public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
+
+    public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
+                                                              new Field("topic", STRING, "The name of the topic"),
+                                                              new Field("is_internal", BOOLEAN,
+                                                                  "Indicates if the topic is considered a Kafka internal topic"),
+                                                              new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
+                                                                "Metadata for each partition of the topic."));
+
+    public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+                                                                    "Host and port information for all brokers."),
+                                                                 new Field("controller_id", INT32,
+                                                                     "The broker id of the controller broker."),
+                                                                 new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
+    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1};
 
     /* Produce api */
 
@@ -496,9 +521,14 @@ public class Protocol {
                                                                                    STRING,
                                                                                    "The unique group id."));
 
+    public static final Schema GROUP_COORDINATOR_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
+                                                                        new Field("host", STRING, "The hostname of the broker."),
+                                                                        new Field("port", INT32,
+                                                                            "The port on which the broker accepts requests."));
+
     public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                           new Field("coordinator",
-                                                                                    BROKER,
+                                                                                    GROUP_COORDINATOR_BROKER_V0,
                                                                                     "Host and port information for the coordinator for a consumer group."));
 
     public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index a08f876..207f108 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -24,13 +24,33 @@ import java.nio.ByteBuffer;
 public class ArrayOf extends Type {
 
     private final Type type;
+    private final boolean nullable;
 
     public ArrayOf(Type type) {
+        this(type, false);
+    }
+
+    public static ArrayOf nullable(Type type) {
+        return new ArrayOf(type, true);
+    }
+
+    private ArrayOf(Type type, boolean nullable) {
         this.type = type;
+        this.nullable = nullable;
+    }
+
+    @Override
+    public boolean isNullable() {
+        return nullable;
     }
 
     @Override
     public void write(ByteBuffer buffer, Object o) {
+        if (o == null) {
+            buffer.putInt(-1);
+            return;
+        }
+
         Object[] objs = (Object[]) o;
         int size = objs.length;
         buffer.putInt(size);
@@ -41,8 +61,11 @@ public class ArrayOf extends Type {
     @Override
     public Object read(ByteBuffer buffer) {
         int size = buffer.getInt();
-        if (size < 0)
+        if (size < 0 && isNullable())
+            return null;
+        else if (size < 0)
             throw new SchemaException("Array size " + size + " cannot be negative");
+
         if (size > buffer.remaining())
             throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
         Object[] objs = new Object[size];
@@ -53,8 +76,11 @@ public class ArrayOf extends Type {
 
     @Override
     public int sizeOf(Object o) {
-        Object[] objs = (Object[]) o;
         int size = 4;
+        if (o == null)
+            return size;
+
+        Object[] objs = (Object[]) o;
         for (int i = 0; i < objs.length; i++)
             size += type.sizeOf(objs[i]);
         return size;
@@ -72,6 +98,9 @@ public class ArrayOf extends Type {
     @Override
     public Object[] validate(Object item) {
         try {
+            if (isNullable() && item == null)
+                return null;
+
             Object[] array = (Object[]) item;
             for (int i = 0; i < array.length; i++)
                 type.validate(array[i]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 79f0638..7eee09f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -41,7 +41,7 @@ public class Struct {
 
     /**
      * Return the value of the given pre-validated field, or if the value is missing return the default value.
-     * 
+     *
      * @param field The field for which to get the default value
      * @throws SchemaException if the field has no value and has no default.
      */
@@ -59,7 +59,7 @@ public class Struct {
 
     /**
      * Get the value for the field directly by the field index with no lookup needed (faster!)
-     * 
+     *
      * @param field The field to look up
      * @return The value for that field.
      * @throws SchemaException if the field has no value and has no default.
@@ -71,7 +71,7 @@ public class Struct {
 
     /**
      * Get the record value for the field with the given name by doing a hash table lookup (slower!)
-     * 
+     *
      * @param name The name of the field
      * @return The value in the record
      * @throws SchemaException If no such field exists
@@ -148,6 +148,14 @@ public class Struct {
         return (String) get(name);
     }
 
+    public Boolean getBoolean(Field field) {
+        return (Boolean) get(field);
+    }
+
+    public Boolean getBoolean(String name) {
+        return (Boolean) get(name);
+    }
+
     public ByteBuffer getBytes(Field field) {
         Object result = get(field);
         if (result instanceof byte[])
@@ -164,7 +172,7 @@ public class Struct {
 
     /**
      * Set the given field to the specified value
-     * 
+     *
      * @param field The field
      * @param value The value
      * @throws SchemaException If the validation of the field failed
@@ -177,7 +185,7 @@ public class Struct {
 
     /**
      * Set the field specified by the given name to the value
-     * 
+     *
      * @param name The name of the field
      * @param value The value to set
      * @throws SchemaException If the field is not known
@@ -194,7 +202,7 @@ public class Struct {
      * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
      * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
      * instantiated with this method.
-     * 
+     *
      * @param field The field to create an instance of
      * @return The struct
      * @throws SchemaException If the given field is not a container type
@@ -213,7 +221,7 @@ public class Struct {
 
     /**
      * Create a struct instance for the given field which must be a container type (struct or array)
-     * 
+     *
      * @param field The name of the field to create (field must be a schema type)
      * @return The struct
      * @throws SchemaException If the given field is not a container type
@@ -307,9 +315,11 @@ public class Struct {
         for (int i = 0; i < this.values.length; i++) {
             Field f = this.schema.get(i);
             if (f.type() instanceof ArrayOf) {
-                Object[] arrayObject = (Object []) this.get(f);
-                for (Object arrayItem: arrayObject)
-                    result = prime * result + arrayItem.hashCode();
+                if (this.get(f) != null) {
+                    Object[] arrayObject = (Object []) this.get(f);
+                    for (Object arrayItem: arrayObject)
+                        result = prime * result + arrayItem.hashCode();
+                }
             } else {
                 Object field = this.get(f);
                 if (field != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 92c1f7c..43b4a37 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -59,6 +59,47 @@ public abstract class Type {
         return false;
     }
 
+    /**
+     * The Boolean type represents a boolean value in a byte by using
+     * the value of 0 to represent false, and 1 to represent true.
+     *
+     * If for some reason a value that is not 0 or 1 is read,
+     * then any non-zero value will return true.
+     */
+    public static final Type BOOLEAN = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if ((Boolean) o)
+                buffer.put((byte) 1);
+            else
+                buffer.put((byte) 0);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            byte value = buffer.get();
+            return value != 0;
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 1;
+        }
+
+        @Override
+        public String toString() {
+            return "BOOLEAN";
+        }
+
+        @Override
+        public Boolean validate(Object item) {
+            if (item instanceof Boolean)
+                return (Boolean) item;
+            else
+                throw new SchemaException(item + " is not a Boolean.");
+        }
+    };
+
     public static final Type INT8 = new Type() {
         @Override
         public void write(ByteBuffer buffer, Object o) {
@@ -196,7 +237,7 @@ public abstract class Type {
                 throw new SchemaException("String length " + length + " cannot be negative");
             if (length > buffer.remaining())
                 throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-            
+
             byte[] bytes = new byte[length];
             buffer.get(bytes);
             return Utils.utf8(bytes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 92d8c6d..f0cb8fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -25,24 +25,41 @@ import java.util.Collections;
 import java.util.List;
 
 public class MetadataRequest extends AbstractRequest {
-    
+
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
     private static final String TOPICS_KEY_NAME = "topics";
 
+    private static final MetadataRequest ALL_TOPICS_REQUEST = new MetadataRequest((List<String>) null); // Unusual cast to work around constructor ambiguity
+
     private final List<String> topics;
 
+    public static MetadataRequest allTopics() {
+        return ALL_TOPICS_REQUEST;
+    }
+
+    /**
+     * In v0 null is not allowed and and empty list indicates requesting all topics.
+     * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
+     */
     public MetadataRequest(List<String> topics) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        if (topics == null)
+            struct.set(TOPICS_KEY_NAME, null);
+        else
+            struct.set(TOPICS_KEY_NAME, topics.toArray());
         this.topics = topics;
     }
 
     public MetadataRequest(Struct struct) {
         super(struct);
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<>();
-        for (Object topicObj: topicArray) {
-            topics.add((String) topicObj);
+        if (topicArray != null) {
+            topics = new ArrayList<>();
+            for (Object topicObj: topicArray) {
+                topics.add((String) topicObj);
+            }
+        } else {
+            topics = null;
         }
     }
 
@@ -52,18 +69,25 @@ public class MetadataRequest extends AbstractRequest {
         Errors error = Errors.forException(e);
         List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
 
-        for (String topic : topics)
-            topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions));
+        if (topics != null) {
+            for (String topic : topics)
+                topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, false, partitions));
+        }
 
         switch (versionId) {
             case 0:
-                return new MetadataResponse(Collections.<Node>emptyList(), topicMetadatas);
+            case 1:
+                return new MetadataResponse(Collections.<Node>emptyList(), MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
         }
     }
 
+    public boolean isAllTopics() {
+        return topics == null;
+    }
+
     public List<String> topics() {
         return topics;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 13e0d8f..09a5bee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -18,7 +18,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -32,7 +31,7 @@ import java.util.Set;
 
 public class MetadataResponse extends AbstractRequestResponse {
 
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+    private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id);
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
@@ -40,6 +39,10 @@ public class MetadataResponse extends AbstractRequestResponse {
     private static final String NODE_ID_KEY_NAME = "node_id";
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
+    private static final String RACK_KEY_NAME = "rack";
+
+    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+    public static final int NO_CONTROLLER_ID = -1;
 
     // topic level field names
     private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
@@ -54,6 +57,7 @@ public class MetadataResponse extends AbstractRequestResponse {
      */
 
     private static final String TOPIC_KEY_NAME = "topic";
+    private static final String IS_INTERNAL_KEY_NAME = "is_internal";
     private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
 
     // partition level field names
@@ -72,13 +76,24 @@ public class MetadataResponse extends AbstractRequestResponse {
     private static final String ISR_KEY_NAME = "isr";
 
     private final Collection<Node> brokers;
+    private final Node controller;
     private final List<TopicMetadata> topicMetadata;
 
+    /**
+     * Constructor for the latest version
+     */
+    public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata) {
+        this(brokers, controllerId, topicMetadata, CURRENT_VERSION);
+    }
 
-    public MetadataResponse(List<Node> brokers, List<TopicMetadata> topicMetadata) {
-        super(new Struct(CURRENT_SCHEMA));
+    /**
+     * Constructor for a specific version
+     */
+    public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata, int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version)));
 
         this.brokers = brokers;
+        this.controller = getControllerNode(controllerId, brokers);
         this.topicMetadata = topicMetadata;
 
         List<Struct> brokerArray = new ArrayList<>();
@@ -87,15 +102,25 @@ public class MetadataResponse extends AbstractRequestResponse {
             broker.set(NODE_ID_KEY_NAME, node.id());
             broker.set(HOST_KEY_NAME, node.host());
             broker.set(PORT_KEY_NAME, node.port());
+            // This field only exists in v1+
+            if (broker.hasField(RACK_KEY_NAME))
+                broker.set(RACK_KEY_NAME, node.rack());
             brokerArray.add(broker);
         }
         struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
 
+        // This field only exists in v1+
+        if (struct.hasField(CONTROLLER_ID_KEY_NAME))
+            struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+
         List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size());
         for (TopicMetadata metadata : topicMetadata) {
             Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, metadata.topic);
             topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
+            // This field only exists in v1+
+            if (topicData.hasField(IS_INTERNAL_KEY_NAME))
+                topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal());
 
             List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size());
             for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
@@ -130,15 +155,28 @@ public class MetadataResponse extends AbstractRequestResponse {
             int nodeId = broker.getInt(NODE_ID_KEY_NAME);
             String host = broker.getString(HOST_KEY_NAME);
             int port = broker.getInt(PORT_KEY_NAME);
-            brokers.put(nodeId, new Node(nodeId, host, port));
+            // This field only exists in v1+
+            // When we can't know if a rack exists in a v0 response we default to null
+            String rack =  broker.hasField(RACK_KEY_NAME) ? broker.getString(RACK_KEY_NAME) : null;
+            brokers.put(nodeId, new Node(nodeId, host, port, rack));
         }
 
+        // This field only exists in v1+
+        // When we can't know the controller id in a v0 response we default to NO_CONTROLLER_ID
+        int controllerId = NO_CONTROLLER_ID;
+        if (struct.hasField(CONTROLLER_ID_KEY_NAME))
+            controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+
         List<TopicMetadata> topicMetadata = new ArrayList<>();
         Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME);
         for (int i = 0; i < topicInfos.length; i++) {
             Struct topicInfo = (Struct) topicInfos[i];
             Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME));
             String topic = topicInfo.getString(TOPIC_KEY_NAME);
+            // This field only exists in v1+
+            // When we can't know if a topic is internal or not in a v0 response we default to false
+            boolean isInternal = topicInfo.hasField(IS_INTERNAL_KEY_NAME) ? topicInfo.getBoolean(IS_INTERNAL_KEY_NAME) : false;
+
             List<PartitionMetadata> partitionMetadata = new ArrayList<>();
 
             Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
@@ -149,23 +187,41 @@ public class MetadataResponse extends AbstractRequestResponse {
                 int leader = partitionInfo.getInt(LEADER_KEY_NAME);
                 Node leaderNode = leader == -1 ? null : brokers.get(leader);
                 Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+
                 List<Node> replicaNodes = new ArrayList<>(replicas.length);
                 for (Object replicaNodeId : replicas)
-                    replicaNodes.add(brokers.get(replicaNodeId));
+                    if (brokers.containsKey(replicaNodeId))
+                        replicaNodes.add(brokers.get(replicaNodeId));
+                    else
+                        replicaNodes.add(new Node((int) replicaNodeId, "", -1));
+
                 Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
                 List<Node> isrNodes = new ArrayList<>(isr.length);
                 for (Object isrNode : isr)
-                    isrNodes.add(brokers.get(isrNode));
+                    if (brokers.containsKey(isrNode))
+                        isrNodes.add(brokers.get(isrNode));
+                    else
+                        isrNodes.add(new Node((int) isrNode, "", -1));
+
                 partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes));
             }
 
-            topicMetadata.add(new TopicMetadata(topicError, topic, partitionMetadata));
+            topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata));
         }
 
         this.brokers = brokers.values();
+        this.controller = getControllerNode(controllerId, brokers.values());
         this.topicMetadata = topicMetadata;
     }
 
+    private Node getControllerNode(int controllerId, Collection<Node> brokers) {
+        for (Node broker : brokers) {
+            if (broker.id() == controllerId)
+                return broker;
+        }
+        return null;
+    }
+
     /**
      * Get a map of the topics which had metadata errors
      * @return the map
@@ -211,20 +267,43 @@ public class MetadataResponse extends AbstractRequestResponse {
         return brokers;
     }
 
+    /**
+     * Get all topic metadata returned in the metadata response
+     * @return the topicMetadata
+     */
+    public Collection<TopicMetadata> topicMetadata() {
+        return topicMetadata;
+    }
+
+    /**
+     * The controller node returned in metadata response
+     * @return the controller node or null if it doesn't exist
+     */
+    public Node controller() {
+        return controller;
+    }
+
     public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse(CURRENT_SCHEMA.read(buffer));
+        return parse(buffer, CURRENT_VERSION);
+    }
+
+    public static MetadataResponse parse(ByteBuffer buffer, int version) {
+        return new MetadataResponse(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version).read(buffer));
     }
 
     public static class TopicMetadata {
         private final Errors error;
         private final String topic;
+        private final boolean isInternal;
         private final List<PartitionMetadata> partitionMetadata;
 
         public TopicMetadata(Errors error,
                              String topic,
+                             boolean isInternal,
                              List<PartitionMetadata> partitionMetadata) {
             this.error = error;
             this.topic = topic;
+            this.isInternal = isInternal;
             this.partitionMetadata = partitionMetadata;
         }
 
@@ -236,6 +315,10 @@ public class MetadataResponse extends AbstractRequestResponse {
             return topic;
         }
 
+        public boolean isInternal() {
+            return isInternal;
+        }
+
         public List<PartitionMetadata> partitionMetadata() {
             return partitionMetadata;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 9002e81..49bff10 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
@@ -478,14 +479,14 @@ public class FetcherTest {
     @Test(expected = InvalidTopicException.class)
     public void testGetTopicMetadataInvalidTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
-        fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+        fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
     }
 
     @Test
     public void testGetTopicMetadataUnknownTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
 
-        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
         assertNull(topicMetadata.get(topicName));
     }
 
@@ -494,7 +495,7 @@ public class FetcherTest {
         client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
         client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
 
-        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
         assertTrue(topicMetadata.containsKey(topicName));
     }
 
@@ -570,8 +571,8 @@ public class FetcherTest {
             }
         }
 
-        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, partitionsMetadata);
-        return new MetadataResponse(cluster.nodes(), Arrays.asList(topicMetadata));
+        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
+        return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
     }
 
     private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 5c34277..e91b2fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -33,7 +33,8 @@ public class ProtocolSerializationTest {
 
     @Before
     public void setup() {
-        this.schema = new Schema(new Field("int8", Type.INT8),
+        this.schema = new Schema(new Field("boolean", Type.BOOLEAN),
+                                 new Field("int8", Type.INT8),
                                  new Field("int16", Type.INT16),
                                  new Field("int32", Type.INT32),
                                  new Field("int64", Type.INT64),
@@ -42,8 +43,10 @@ public class ProtocolSerializationTest {
                                  new Field("bytes", Type.BYTES),
                                  new Field("nullable_bytes", Type.NULLABLE_BYTES),
                                  new Field("array", new ArrayOf(Type.INT32)),
+                                 new Field("null_array", ArrayOf.nullable(Type.INT32)),
                                  new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32)))));
-        this.struct = new Struct(this.schema).set("int8", (byte) 1)
+        this.struct = new Struct(this.schema).set("boolean", true)
+                                             .set("int8", (byte) 1)
                                              .set("int16", (short) 1)
                                              .set("int32", 1)
                                              .set("int64", 1L)
@@ -51,12 +54,15 @@ public class ProtocolSerializationTest {
                                              .set("nullable_string", null)
                                              .set("bytes", ByteBuffer.wrap("1".getBytes()))
                                              .set("nullable_bytes", null)
-                                             .set("array", new Object[] {1});
+                                             .set("array", new Object[] {1})
+                                             .set("null_array", null);
         this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3}));
     }
 
     @Test
     public void testSimple() {
+        check(Type.BOOLEAN, false);
+        check(Type.BOOLEAN, true);
         check(Type.INT8, (byte) -111);
         check(Type.INT16, (short) -11111);
         check(Type.INT32, -11111111);
@@ -75,6 +81,7 @@ public class ProtocolSerializationTest {
         check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
         check(new ArrayOf(Type.STRING), new Object[] {});
         check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
+        check(ArrayOf.nullable(Type.STRING), null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 92f3101..0018f53 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -70,9 +70,10 @@ public class RequestResponseTest {
                 createListOffsetRequest(),
                 createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
                 createListOffsetResponse(),
-                createMetadataRequest(),
-                createMetadataRequest().getErrorResponse(0, new UnknownServerException()),
-                createMetadataResponse(),
+                MetadataRequest.allTopics(),
+                createMetadataRequest(Arrays.asList("topic1")),
+                createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new UnknownServerException()),
+                createMetadataResponse(1),
                 createOffsetCommitRequest(2),
                 createOffsetCommitRequest(2).getErrorResponse(2, new UnknownServerException()),
                 createOffsetCommitResponse(),
@@ -100,6 +101,8 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
+        createMetadataResponse(0);
+        createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new UnknownServerException());
         checkSerialization(createFetchRequest().getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(0), 0);
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
@@ -281,22 +284,22 @@ public class RequestResponseTest {
         return new ListOffsetResponse(responseData);
     }
 
-    private AbstractRequest createMetadataRequest() {
-        return new MetadataRequest(Arrays.asList("topic1"));
+    private AbstractRequest createMetadataRequest(List<String> topics) {
+        return new MetadataRequest(topics);
     }
 
-    private AbstractRequestResponse createMetadataResponse() {
+    private AbstractRequestResponse createMetadataResponse(int version) {
         Node node = new Node(1, "host1", 1001);
         List<Node> replicas = Arrays.asList(node);
         List<Node> isr = Arrays.asList(node);
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
-        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic1",
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
                 Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
-        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2",
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
 
-        return new MetadataResponse(Arrays.asList(node), allTopicMetadata);
+        return new MetadataResponse(Arrays.asList(node), MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
     }
 
     private AbstractRequest createOffsetCommitRequest(int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 24174be..a8a282e 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -605,10 +605,10 @@ object AdminUtils extends Logging {
             new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
         }
       }
-      new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava)
+      new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava)
     } else {
       // topic doesn't exist, send appropriate error code
-      new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
+      new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 232db4a..9f1014f 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -170,7 +170,7 @@ object TopicCommand extends Logging {
     }
     topics.foreach { topic =>
       try {
-        if (TopicConstants.INTERNAL_TOPICS.contains(topic)) {
+        if (Topic.isInternal(topic)) {
           throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
           zkUtils.createPersistentPath(getDeleteTopicPath(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 77b85e0..61290c1 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -123,7 +123,7 @@ case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]
   def getNode(protocolType: SecurityProtocol): Node = {
     val endpoint = endPoints.getOrElse(protocolType,
       throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
-    new Node(id, endpoint.host, endpoint.port)
+    new Node(id, endpoint.host, endpoint.port, rack.orNull)
   }
 
   def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 6067712..054c5eb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,7 +18,7 @@
 package kafka.common
 
 import util.matching.Regex
-import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants.INTERNAL_TOPICS
 
 object Topic {
   val legalChars = "[a-zA-Z0-9\\._\\-]"
@@ -62,4 +62,7 @@ object Topic {
     topicA.replace('.', '_') == topicB.replace('.', '_')
   }
 
+  def isInternal(topic: String): Boolean =
+    INTERNAL_TOPICS.contains(topic)
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9afefa5..406b1bd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,6 +24,7 @@ import java.util.Properties
 import kafka.admin.{RackAwareMode, AdminUtils}
 import kafka.api._
 import kafka.cluster.Partition
+import kafka.common
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
@@ -631,12 +632,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
-      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+        java.util.Collections.emptyList())
     } catch {
       case e: TopicExistsException => // let it go, possibly another broker created this topic
-        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+          java.util.Collections.emptyList())
       case itex: InvalidTopicException =>
-        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList())
+        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
+          java.util.Collections.emptyList())
     }
   }
 
@@ -656,8 +660,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
   }
 
-  private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
-    val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
+  private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+    val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol, errorUnavailableEndpoints)
     if (topics.isEmpty || topicResponses.size == topics.size) {
       topicResponses
     } else {
@@ -668,7 +672,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+            java.util.Collections.emptyList())
         }
       }
       topicResponses ++ responsesForNonExistentTopics
@@ -680,16 +685,24 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.body.asInstanceOf[MetadataRequest]
+    val requestVersion = request.header.apiVersion()
 
-    val topics = metadataRequest.topics.asScala.toSet
-    var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) {
-      //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
-      val authorized = metadataCache.getAllTopics()
-        .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
-      (authorized, mutable.Set[String]())
-    } else {
+    val topics =
+      // Handle old metadata request logic. Version 0 has no way to specify "no topics".
+      if (requestVersion == 0) {
+        if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)
+          metadataCache.getAllTopics()
+        else
+          metadataRequest.topics.asScala.toSet
+      } else {
+        if (metadataRequest.isAllTopics)
+          metadataCache.getAllTopics()
+        else
+          metadataRequest.topics.asScala.toSet
+      }
+
+    var (authorizedTopics, unauthorizedTopics) =
       topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
-    }
 
     if (authorizedTopics.nonEmpty) {
       val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
@@ -704,22 +717,32 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
-      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList()))
+      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
+        java.util.Collections.emptyList()))
+
+    // In version 0, we returned an error when brokers with replicas were unavailable,
+    // while in higher versions we simply don't include the broker in the returned broker list
+    val errorUnavailableEndpoints = requestVersion == 0
+    val topicMetadata =
+      if (authorizedTopics.isEmpty)
+        Seq.empty[MetadataResponse.TopicMetadata]
+      else
+        getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)
 
-    val topicMetadata = if (authorizedTopics.isEmpty)
-      Seq.empty[MetadataResponse.TopicMetadata]
-    else
-      getTopicMetadata(authorizedTopics, request.securityProtocol)
+    val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
 
     val brokers = metadataCache.getAliveBrokers
 
-    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
+    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, request.header.clientId))
 
     val responseHeader = new ResponseHeader(request.header.correlationId)
+
     val responseBody = new MetadataResponse(
       brokers.map(_.getNode(request.securityProtocol)).asJava,
-      (topicMetadata ++ unauthorizedTopicMetadata).asJava
+      metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+      completeTopicMetadata.asJava,
+      requestVersion
     )
     requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 06fae42..b387f2e 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -24,11 +24,11 @@ import scala.collection.{Seq, Set, mutable}
 import scala.collection.JavaConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
-import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
+import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition}
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
@@ -40,16 +40,24 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
 private[server] class MetadataCache(brokerId: Int) extends Logging {
   private val stateChangeLogger = KafkaController.stateChangeLogger
   private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
+  private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
   private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
   this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
 
-  private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = {
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def getEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol, filterUnavailableEndpoints: Boolean): Seq[Node] = {
     val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
     brokers.foreach { brokerId =>
-      getAliveEndpoint(brokerId, protocol).foreach(result +=)
+      val endpoint = getAliveEndpoint(brokerId, protocol) match {
+        case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None
+        case Some(node) => Some(node)
+      }
+      endpoint.foreach(result +=)
     }
     result
   }
@@ -60,7 +68,8 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
         throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support security protocol `$protocol`"))
     }
 
-  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
     cache.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
         val topicPartition = TopicAndPartition(topic, partitionId)
@@ -69,7 +78,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
         val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol)
 
         val replicas = partitionState.allReplicas
-        val replicaInfo = getAliveEndpoints(replicas, protocol)
+        val replicaInfo = getEndpoints(replicas, protocol, errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
@@ -79,7 +88,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
 
           case Some(leader) =>
             val isr = leaderAndIsr.isr
-            val isrInfo = getAliveEndpoints(isr, protocol)
+            val isrInfo = getEndpoints(isr, protocol, errorUnavailableEndpoints)
 
             if (replicaInfo.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
@@ -101,12 +110,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
     inReadLock(partitionMetadataLock) {
-      val topicsRequested = if (topics.isEmpty) cache.keySet else topics
-      topicsRequested.toSeq.flatMap { topic =>
-        getPartitionMetadata(topic, protocol).map { partitionMetadata =>
-          new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toBuffer.asJava)
+      topics.toSeq.flatMap { topic =>
+        getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { partitionMetadata =>
+          new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
         }
       }
     }
@@ -151,8 +160,14 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def getControllerId: Option[Int] = controllerId
+
   def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) {
     inWriteLock(partitionMetadataLock) {
+      controllerId = updateMetadataRequest.controllerId match {
+          case id if id < 0 => None
+          case id => Some(id)
+        }
       aliveNodes.clear()
       aliveBrokers.clear()
       updateMetadataRequest.liveBrokers.asScala.foreach { broker =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9bbd29e..888912b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,6 +28,7 @@ import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.I0Itec.zkclient.IZkChildListener
 import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
 InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException,
 InvalidTimestampException}
@@ -39,7 +40,6 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
 import scala.collection._
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.internals.TopicConstants
 
 /*
  * Result metadata of a log append operation on the log
@@ -394,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
       BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
-      if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) {
+      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
         (topicPartition, LogAppendResult(
           LogAppendInfo.UnknownLogAppendInfo,
           Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))))

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
new file mode 100644
index 0000000..3d05c1d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -0,0 +1,106 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.network.SocketServer
+import kafka.utils._
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader}
+import org.junit.Before
+
+abstract class BaseRequestTest extends KafkaServerTestHarness {
+  val numBrokers = 3
+  private var correlationId = 0
+
+  // Override properties by mutating the passed Properties object
+  def propertyOverrides(properties: Properties): Unit
+
+  def generateConfigs() = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
+    props.foreach(propertyOverrides)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
+  }
+
+  def socketServer = {
+    servers.find { server =>
+      val state = server.brokerState.currentState
+      state != NotRunning.state && state != BrokerShuttingDown.state
+    }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
+  }
+
+  private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
+    new Socket("localhost", s.boundPort(protocol))
+  }
+
+  private def sendRequest(socket: Socket, request: Array[Byte]) {
+    val outgoing = new DataOutputStream(socket.getOutputStream)
+    outgoing.writeInt(request.length)
+    outgoing.write(request)
+    outgoing.flush()
+  }
+
+  private def receiveResponse(socket: Socket): Array[Byte] = {
+    val incoming = new DataInputStream(socket.getInputStream)
+    val len = incoming.readInt()
+    val response = new Array[Byte](len)
+    incoming.readFully(response)
+    response
+  }
+
+  private def requestAndReceive(request: Array[Byte]): Array[Byte] = {
+    val plainSocket = connect()
+    try {
+      sendRequest(plainSocket, request)
+      receiveResponse(plainSocket)
+    } finally {
+      plainSocket.close()
+    }
+  }
+
+  /**
+    * Serializes and send the request to the given api. A ByteBuffer containing the response is returned.
+    */
+  def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = {
+    correlationId += 1
+    val serializedBytes = {
+      val header = new RequestHeader(apiKey.id, version, "", correlationId)
+      val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
+      header.writeTo(byteBuffer)
+      request.writeTo(byteBuffer)
+      byteBuffer.array()
+    }
+
+    val response = requestAndReceive(serializedBytes)
+
+    val responseBuffer = ByteBuffer.wrap(response)
+    ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move the buffer forward
+    responseBuffer
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 017faea..770513c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -158,7 +158,8 @@ class MetadataCacheTest {
     val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
+    // Validate errorUnavailableEndpoints = false
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -169,9 +170,25 @@ class MetadataCacheTest {
 
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
-    assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
+    assertEquals(Errors.NONE, partitionMetadata.error)
+    assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
     assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+
+    // Validate errorUnavailableEndpoints = true
+    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+    assertEquals(1, topicMetadatasWithError.size)
+
+    val topicMetadataWithError = topicMetadatasWithError.head
+    assertEquals(Errors.NONE, topicMetadataWithError.error)
+
+    val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+    assertEquals(1, partitionMetadatasWithError.size)
+
+    val partitionMetadataWithError = partitionMetadatasWithError.get(0)
+    assertEquals(0, partitionMetadataWithError.partition)
+    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
+    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
   }
 
   @Test
@@ -197,7 +214,8 @@ class MetadataCacheTest {
     val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
+    // Validate errorUnavailableEndpoints = false
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -208,9 +226,25 @@ class MetadataCacheTest {
 
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
+    assertEquals(Errors.NONE, partitionMetadata.error)
     assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+
+    // Validate errorUnavailableEndpoints = true
+    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+    assertEquals(1, topicMetadatasWithError.size)
+
+    val topicMetadataWithError = topicMetadatasWithError.head
+    assertEquals(Errors.NONE, topicMetadataWithError.error)
+
+    val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+    assertEquals(1, partitionMetadatasWithError.size)
+
+    val partitionMetadataWithError = partitionMetadatasWithError.get(0)
+    assertEquals(0, partitionMetadataWithError.partition)
+    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
+    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
   }
 
   @Test