You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/02 20:27:41 UTC

kafka git commit: KAFKA-2880: consumer should handle disconnect/timeout for metadata requests

Repository: kafka
Updated Branches:
  refs/heads/trunk 8c3c9548b -> 9fb1e2573


KAFKA-2880: consumer should handle disconnect/timeout for metadata requests

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #581 from hachikuji/KAFKA-2880


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fb1e257
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fb1e257
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fb1e257

Branch: refs/heads/trunk
Commit: 9fb1e25738f89b75a36ef69f730b0e138ccd55b1
Parents: 8c3c954
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Dec 2 11:27:36 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 2 11:27:36 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   6 +
 .../consumer/internals/AbstractCoordinator.java |  35 +--
 .../consumer/internals/ConsumerCoordinator.java |   5 +-
 .../internals/ConsumerNetworkClient.java        |  23 +-
 .../clients/consumer/internals/Fetcher.java     | 267 ++++++++++---------
 .../common/errors/DisconnectException.java      |   1 +
 .../kafka/common/requests/MetadataResponse.java |  37 ++-
 .../clients/consumer/internals/FetcherTest.java |  67 ++++-
 .../org/apache/kafka/common/utils/MockTime.java |   8 +
 .../main/scala/kafka/admin/AdminClient.scala    |   6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  14 +
 .../kafka/api/PlaintextConsumerTest.scala       |  16 +-
 12 files changed, 313 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6c15df3..9b36af6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1136,6 +1136,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
+     * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+     *             expiration of the configured request timeout
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
@@ -1160,6 +1163,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The map of topics and its partitions
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+     *             expiration of the configured request timeout
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9aa1aaf..322de5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -235,7 +234,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     continue;
                 else if (!future.isRetriable())
                     throw exception;
-                Utils.sleep(retryBackoffMs);
+                time.sleep(retryBackoffMs);
             }
         }
     }
@@ -484,11 +483,7 @@ public abstract class AbstractCoordinator implements Closeable {
     private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
         log.debug("Group metadata response {}", resp);
 
-        // parse the response to get the coordinator info if it is not disconnected,
-        // otherwise we need to request metadata update
-        if (resp.wasDisconnected()) {
-            future.raise(new DisconnectException());
-        } else if (!coordinatorUnknown()) {
+        if (!coordinatorUnknown()) {
             // We already found the coordinator, so ignore the request
             future.complete(null);
         } else {
@@ -661,25 +656,19 @@ public abstract class AbstractCoordinator implements Closeable {
         public abstract void handle(R response, RequestFuture<T> future);
 
         @Override
-        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
-            this.response = clientResponse;
-
-            if (clientResponse.wasDisconnected()) {
-                int correlation = response.request().request().header().correlationId();
-                log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
-                        response.request(),
-                        correlation,
-                        response.request().request().destination());
-
-                // mark the coordinator as dead
+        public void onFailure(RuntimeException e, RequestFuture<T> future) {
+            // mark the coordinator as dead
+            if (e instanceof DisconnectException)
                 coordinatorDead();
-                future.raise(new DisconnectException());
-                return;
-            }
+            future.raise(e);
+        }
 
+        @Override
+        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
             try {
-                R response = parse(clientResponse);
-                handle(response, future);
+                this.response = clientResponse;
+                R responseObj = parse(clientResponse);
+                handle(responseObj, future);
             } catch (RuntimeException e) {
                 if (!future.isDone())
                     future.raise(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f6d1029..8453c7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,7 +293,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             if (!future.isRetriable())
                 throw future.exception();
 
-            Utils.sleep(retryBackoffMs);
+            time.sleep(retryBackoffMs);
         }
     }
 
@@ -358,7 +357,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             if (!future.isRetriable())
                 throw future.exception();
 
-            Utils.sleep(retryBackoffMs);
+            time.sleep(retryBackoffMs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index f2e215d..84c312e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -18,12 +18,15 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -45,6 +48,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
  */
 public class ConsumerNetworkClient implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
+
     private final KafkaClient client;
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
     private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
@@ -88,7 +93,9 @@ public class ConsumerNetworkClient implements Closeable {
      * Send a new request. Note that the request is not actually transmitted on the
      * network until one of the {@link #poll(long)} variants is invoked. At this
      * point the request will either be transmitted successfully or will fail.
-     * Use the returned future to obtain the result of the send.
+     * Use the returned future to obtain the result of the send. Note that there is no
+     * need to check for disconnects explicitly on the {@link ClientResponse} object;
+     * instead, the future will be failed with a {@link DisconnectException}.
      * @param node The destination of the request
      * @param api The Kafka API call
      * @param request The request payload
@@ -258,7 +265,7 @@ public class ConsumerNetworkClient implements Closeable {
                 for (ClientRequest request : requestEntry.getValue()) {
                     RequestFutureCompletionHandler handler =
                             (RequestFutureCompletionHandler) request.callback();
-                    handler.complete(new ClientResponse(request, now, true, null));
+                    handler.onComplete(new ClientResponse(request, now, true, null));
                 }
                 iterator.remove();
             }
@@ -350,7 +357,17 @@ public class ConsumerNetworkClient implements Closeable {
 
         @Override
         public void onComplete(ClientResponse response) {
-            complete(response);
+            if (response.wasDisconnected()) {
+                ClientRequest request = response.request();
+                RequestSend send = request.request();
+                ApiKeys api = ApiKeys.forId(send.header().apiKey());
+                int correlation = send.header().correlationId();
+                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
+                        api, request, correlation, send.destination());
+                raise(DisconnectException.INSTANCE);
+            } else {
+                complete(response);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a034264..5708869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -25,9 +25,11 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -113,7 +115,7 @@ public class Fetcher<K, V> {
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
 
-        this.records = new LinkedList<PartitionRecords<K, V>>();
+        this.records = new LinkedList<>();
         this.offsetOutOfRangePartitions = new HashMap<>();
         this.unauthorizedTopics = new HashSet<>();
         this.recordTooLargePartitions = new HashMap<>();
@@ -191,45 +193,82 @@ public class Fetcher<K, V> {
         if (topics != null && topics.isEmpty())
             return Collections.emptyMap();
 
-        final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
-        long startTime = time.milliseconds();
+        long start = time.milliseconds();
+        long remaining = timeout;
 
-        while (time.milliseconds() - startTime < timeout) {
-            RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(topics);
-            if (requestFuture != null) {
-                client.poll(requestFuture);
+        do {
+            RequestFuture<ClientResponse> future = sendMetadataRequest(topics);
+            client.poll(future, remaining);
 
-                if (requestFuture.succeeded()) {
-                    MetadataResponse response =
-                        new MetadataResponse(requestFuture.value().responseBody());
+            if (future.failed() && !future.isRetriable())
+                throw future.exception();
 
-                    for (String topic : response.cluster().topics())
-                        topicsPartitionInfos.put(
-                            topic, response.cluster().availablePartitionsForTopic(topic));
+            if (future.succeeded()) {
+                MetadataResponse response = new MetadataResponse(future.value().responseBody());
+                Cluster cluster = response.cluster();
+
+                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
+                if (!unauthorizedTopics.isEmpty())
+                    throw new TopicAuthorizationException(unauthorizedTopics);
+
+                boolean shouldRetry = false;
+                if (!response.errors().isEmpty()) {
+                    // if there were errors, we need to check whether they were fatal or whether
+                    // we should just retry
+
+                    log.debug("Topic metadata fetch included errors: {}", response.errors());
+
+                    for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) {
+                        String topic = errorEntry.getKey();
+                        Errors error = errorEntry.getValue();
+
+                        if (error == Errors.INVALID_TOPIC_EXCEPTION)
+                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
+                        else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                            // if a requested topic is unknown, we just continue and let it be absent
+                            // in the returned map
+                            continue;
+                        else if (error.exception() instanceof RetriableException)
+                            shouldRetry = true;
+                        else
+                            throw new KafkaException("Unexpected error fetching metadata for topic " + topic,
+                                    error.exception());
+                    }
+                }
 
+                if (!shouldRetry) {
+                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
+                    for (String topic : cluster.topics())
+                        topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
                     return topicsPartitionInfos;
                 }
-
-                if (!requestFuture.isRetriable())
-                    throw requestFuture.exception();
             }
 
-            Utils.sleep(retryBackoffMs);
-        }
+            long elapsed = time.milliseconds() - start;
+            remaining = timeout - elapsed;
 
-        return topicsPartitionInfos;
+            if (remaining > 0) {
+                long backoff = Math.min(remaining, retryBackoffMs);
+                time.sleep(backoff);
+                remaining -= backoff;
+            }
+        } while (remaining > 0);
+
+        throw new TimeoutException("Timeout expired while fetching topic metadata");
     }
 
     /**
      * Send Metadata Request to least loaded node in Kafka cluster asynchronously
      * @return A future that indicates result of sent metadata request
      */
-    public RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
+    private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
         if (topics == null)
             topics = Collections.emptyList();
         final Node node = client.leastLoadedNode();
-        return node == null ? null :
-            client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
+        if (node == null)
+            return RequestFuture.noBrokersAvailable();
+        else
+            return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
     }
 
     /**
@@ -277,7 +316,7 @@ public class Fetcher<K, V> {
             if (future.exception() instanceof InvalidMetadataException)
                 client.awaitMetadataUpdate();
             else
-                Utils.sleep(retryBackoffMs);
+                time.sleep(retryBackoffMs);
         }
     }
 
@@ -435,29 +474,25 @@ public class Fetcher<K, V> {
     private void handleListOffsetResponse(TopicPartition topicPartition,
                                           ClientResponse clientResponse,
                                           RequestFuture<Long> future) {
-        if (clientResponse.wasDisconnected()) {
-            future.raise(new DisconnectException());
+        ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
+        short errorCode = lor.responseData().get(topicPartition).errorCode;
+        if (errorCode == Errors.NONE.code()) {
+            List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+            if (offsets.size() != 1)
+                throw new IllegalStateException("This should not happen.");
+            long offset = offsets.get(0);
+            log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+
+            future.complete(offset);
+        } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+            log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+                    topicPartition);
+            future.raise(Errors.forCode(errorCode));
         } else {
-            ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
-            short errorCode = lor.responseData().get(topicPartition).errorCode;
-            if (errorCode == Errors.NONE.code()) {
-                List<Long> offsets = lor.responseData().get(topicPartition).offsets;
-                if (offsets.size() != 1)
-                    throw new IllegalStateException("This should not happen.");
-                long offset = offsets.get(0);
-                log.debug("Fetched offset {} for partition {}", offset, topicPartition);
-
-                future.complete(offset);
-            } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                    || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
-                        topicPartition);
-                future.raise(Errors.forCode(errorCode));
-            } else {
-                log.error("Attempt to fetch offsets for partition {} failed due to: {}",
-                        topicPartition, Errors.forCode(errorCode).exception().getMessage());
-                future.raise(new StaleMetadataException());
-            }
+            log.error("Attempt to fetch offsets for partition {} failed due to: {}",
+                    topicPartition, Errors.forCode(errorCode).exception().getMessage());
+            future.raise(new StaleMetadataException());
         }
     }
 
@@ -502,84 +537,78 @@ public class Fetcher<K, V> {
      * The callback for fetch completion
      */
     private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
-        if (resp.wasDisconnected()) {
-            int correlation = resp.request().request().header().correlationId();
-            log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
-                resp.request(), correlation, resp.request().request().destination());
-        } else {
-            int totalBytes = 0;
-            int totalCount = 0;
-            FetchResponse response = new FetchResponse(resp.responseBody());
-            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                FetchResponse.PartitionData partition = entry.getValue();
-                if (!subscriptions.isFetchable(tp)) {
-                    // this can happen when a rebalance happened or a partition consumption paused
-                    // while fetch is still in-flight
-                    log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
-                } else if (partition.errorCode == Errors.NONE.code()) {
-                    long fetchOffset = request.fetchData().get(tp).offset;
-
-                    // we are interested in this fetch only if the beginning offset matches the
-                    // current consumed position
-                    Long consumed = subscriptions.consumed(tp);
-                    if (consumed == null) {
-                        continue;
-                    } else if (consumed != fetchOffset) {
-                        // the fetched position has gotten out of sync with the consumed position
-                        // (which might happen when a rebalance occurs with a fetch in-flight),
-                        // so we need to reset the fetch position so the next fetch is right
-                        subscriptions.fetched(tp, consumed);
-                        continue;
-                    }
-
-                    int bytes = 0;
-                    ByteBuffer buffer = partition.recordSet;
-                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
-                    for (LogEntry logEntry : records) {
-                        parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.size();
-                    }
+        int totalBytes = 0;
+        int totalCount = 0;
+        FetchResponse response = new FetchResponse(resp.responseBody());
+        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+            TopicPartition tp = entry.getKey();
+            FetchResponse.PartitionData partition = entry.getValue();
+            if (!subscriptions.isFetchable(tp)) {
+                // this can happen when a rebalance happened or a partition consumption paused
+                // while fetch is still in-flight
+                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
+            } else if (partition.errorCode == Errors.NONE.code()) {
+                long fetchOffset = request.fetchData().get(tp).offset;
+
+                // we are interested in this fetch only if the beginning offset matches the
+                // current consumed position
+                Long consumed = subscriptions.consumed(tp);
+                if (consumed == null) {
+                    continue;
+                } else if (consumed != fetchOffset) {
+                    // the fetched position has gotten out of sync with the consumed position
+                    // (which might happen when a rebalance occurs with a fetch in-flight),
+                    // so we need to reset the fetch position so the next fetch is right
+                    subscriptions.fetched(tp, consumed);
+                    continue;
+                }
 
-                    if (!parsed.isEmpty()) {
-                        ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                        this.subscriptions.fetched(tp, record.offset() + 1);
-                        this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
-                        this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                    } else if (buffer.limit() > 0) {
-                        // we did not read a single message from a non-empty buffer
-                        // because that message's size is larger than fetch size, in this case
-                        // record this exception
-                        this.recordTooLargePartitions.put(tp, fetchOffset);
-                    }
+                int bytes = 0;
+                ByteBuffer buffer = partition.recordSet;
+                MemoryRecords records = MemoryRecords.readableRecords(buffer);
+                List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+                for (LogEntry logEntry : records) {
+                    parsed.add(parseRecord(tp, logEntry));
+                    bytes += logEntry.size();
+                }
 
-                    this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
-                    totalBytes += bytes;
-                    totalCount += parsed.size();
-                } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                    || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                    this.metadata.requestUpdate();
-                } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
-                    long fetchOffset = request.fetchData().get(tp).offset;
-                    if (subscriptions.hasDefaultOffsetResetPolicy())
-                        subscriptions.needOffsetReset(tp);
-                    else
-                        this.offsetOutOfRangePartitions.put(tp, fetchOffset);
-                    log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
-                } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
-                    log.warn("Not authorized to read from topic {}.", tp.topic());
-                    unauthorizedTopics.add(tp.topic());
-                } else if (partition.errorCode == Errors.UNKNOWN.code()) {
-                    log.warn("Unknown error fetching data for topic-partition {}", tp);
-                } else {
-                    throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+                if (!parsed.isEmpty()) {
+                    ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
+                    this.subscriptions.fetched(tp, record.offset() + 1);
+                    this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
+                    this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+                } else if (buffer.limit() > 0) {
+                    // we did not read a single message from a non-empty buffer
+                    // because that message's size is larger than fetch size, in this case
+                    // record this exception
+                    this.recordTooLargePartitions.put(tp, fetchOffset);
                 }
+
+                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+                totalBytes += bytes;
+                totalCount += parsed.size();
+            } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                this.metadata.requestUpdate();
+            } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+                long fetchOffset = request.fetchData().get(tp).offset;
+                if (subscriptions.hasDefaultOffsetResetPolicy())
+                    subscriptions.needOffsetReset(tp);
+                else
+                    this.offsetOutOfRangePartitions.put(tp, fetchOffset);
+                log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+            } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+                log.warn("Not authorized to read from topic {}.", tp.topic());
+                unauthorizedTopics.add(tp.topic());
+            } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+                log.warn("Unknown error fetching data for topic-partition {}", tp);
+            } else {
+                throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
             }
-            this.sensors.bytesFetched.record(totalBytes);
-            this.sensors.recordsFetched.record(totalCount);
-            this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
         }
+        this.sensors.bytesFetched.record(totalBytes);
+        this.sensors.recordsFetched.record(totalCount);
+        this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
         this.sensors.fetchLatency.record(resp.requestLatencyMs());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
index 18d61a2..557681c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
@@ -17,6 +17,7 @@ package org.apache.kafka.common.errors;
  * Server disconnected before a request could be completed.
  */
 public class DisconnectException extends RetriableException {
+    public static final DisconnectException INSTANCE = new DisconnectException();
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index eb163dd..170e4b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -45,9 +45,12 @@ public class MetadataResponse extends AbstractRequestResponse {
     private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
 
     /**
-     * Possible error code:
+     * Possible error codes:
      *
-     * TODO
+     * UnknownTopic (3)
+     * LeaderNotAvailable (5)
+     * InvalidTopic (17)
+     * TopicAuthorizationFailed (29)
      */
 
     private static final String TOPIC_KEY_NAME = "topic";
@@ -57,9 +60,10 @@ public class MetadataResponse extends AbstractRequestResponse {
     private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
 
     /**
-     * Possible error code:
+     * Possible error codes:
      *
-     * TODO
+     * LeaderNotAvailable (5)
+     * ReplicaNotAvailable (9)
      */
 
     private static final String PARTITION_KEY_NAME = "partition_id";
@@ -87,14 +91,20 @@ public class MetadataResponse extends AbstractRequestResponse {
         }
         struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
 
+
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (String topic : cluster.topics()) {
+        for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
             Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, errorEntry.getKey());
+            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errorEntry.getValue().code());
+            topicData.set(PARTITION_METADATA_KEY_NAME, new Struct[0]);
+            topicArray.add(topicData);
+        }
 
-            topicData.set(TOPIC_KEY_NAME, topic);
-            if (errors.containsKey(topic)) {
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
-            } else {
+        for (String topic : cluster.topics()) {
+            if (!errors.containsKey(topic)) {
+                Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+                topicData.set(TOPIC_KEY_NAME, topic);
                 topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
                 List<Struct> partitionArray = new ArrayList<Struct>();
                 for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
@@ -113,14 +123,13 @@ public class MetadataResponse extends AbstractRequestResponse {
                     partitionArray.add(partitionData);
                 }
                 topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+                topicArray.add(topicData);
             }
-
-            topicArray.add(topicData);
         }
         struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
 
         this.cluster = cluster;
-        this.errors = new HashMap<String, Errors>();
+        this.errors = errors;
     }
 
     public MetadataResponse(Struct struct) {
@@ -183,6 +192,10 @@ public class MetadataResponse extends AbstractRequestResponse {
         return this.errors;
     }
 
+    public boolean hasError(String topic) {
+        return this.errors.containsKey(topic);
+    }
+
     public Cluster cluster() {
         return this.cluster;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fe9a6aa..1ffff4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,14 +22,16 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -73,7 +75,7 @@ public class FetcherTest {
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
     private long retryBackoffMs = 100;
-    private MockTime time = new MockTime();
+    private MockTime time = new MockTime(1);
     private MockClient client = new MockClient(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
@@ -395,7 +397,7 @@ public class FetcherTest {
     }
 
     @Test
-    public void testGetAllTopics() throws InterruptedException {
+    public void testGetAllTopics() {
         // sending response before request, as getTopicMetadata is a blocking call
         client.prepareResponse(
             new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
@@ -405,6 +407,63 @@ public class FetcherTest {
         assertEquals(cluster.topics().size(), allTopics.size());
     }
 
+    @Test
+    public void testGetAllTopicsDisconnect() {
+        // first try gets a disconnect, next succeeds
+        client.prepareResponse(null, true);
+        client.prepareResponse(new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+        Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
+        assertEquals(cluster.topics().size(), allTopics.size());
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testGetAllTopicsTimeout() {
+        // since no response is prepared, the request should timeout
+        fetcher.getAllTopicMetadata(50L);
+    }
+
+    @Test
+    public void testGetAllTopicsUnauthorized() {
+        client.prepareResponse(new MetadataResponse(cluster,
+                Collections.singletonMap(topicName, Errors.TOPIC_AUTHORIZATION_FAILED)).toStruct());
+        try {
+            fetcher.getAllTopicMetadata(10L);
+            fail();
+        } catch (TopicAuthorizationException e) {
+            assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+        }
+    }
+
+    @Test(expected = InvalidTopicException.class)
+    public void testGetTopicMetadataInvalidTopic() {
+        client.prepareResponse(new MetadataResponse(cluster,
+                Collections.singletonMap(topicName, Errors.INVALID_TOPIC_EXCEPTION)).toStruct());
+        fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+    }
+
+    @Test
+    public void testGetTopicMetadataUnknownTopic() {
+        Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet());
+        client.prepareResponse(new MetadataResponse(emptyCluster,
+                Collections.singletonMap(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toStruct());
+
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+        assertNull(topicMetadata.get(topicName));
+    }
+
+    @Test
+    public void testGetTopicMetadataLeaderNotAvailable() {
+        Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet());
+        client.prepareResponse(new MetadataResponse(emptyCluster,
+                Collections.singletonMap(topicName, Errors.LEADER_NOT_AVAILABLE)).toStruct());
+        client.prepareResponse(new MetadataResponse(this.cluster,
+                Collections.<String, Errors>emptyMap()).toStruct());
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+        assertTrue(topicMetadata.containsKey(topicName));
+    }
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
@@ -457,7 +516,7 @@ public class FetcherTest {
     }
 
     private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
-        return new Fetcher<byte[], byte[]>(consumerClient,
+        return new Fetcher<>(consumerClient,
                 minBytes,
                 maxWaitMs,
                 fetchSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index 387e48f..533f869 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -20,18 +20,26 @@ import java.util.concurrent.TimeUnit;
 public class MockTime implements Time {
 
     private long nanos = 0;
+    private long autoTickMs = 0;
 
     public MockTime() {
         this.nanos = System.nanoTime();
     }
 
+    public MockTime(long autoTickMs) {
+        this.nanos = System.nanoTime();
+        this.autoTickMs = autoTickMs;
+    }
+
     @Override
     public long milliseconds() {
+        this.sleep(autoTickMs);
         return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
     }
 
     @Override
     public long nanoseconds() {
+        this.sleep(autoTickMs);
         return nanos;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 53b6fdb..3a7e6de 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -52,11 +52,7 @@ class AdminClient(val time: Time,
       client.poll(future)
 
       if (future.succeeded())
-        return if (future.value().wasDisconnected()) {
-          throw new DisconnectException()
-        } else {
-          future.value().responseBody()
-        }
+        return future.value().responseBody()
 
       now = time.milliseconds()
     } while (now < deadline && future.exception().isInstanceOf[SendFailedException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c8ca2a3..26ab885 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -499,6 +499,20 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     this.consumers.head.position(tp)
   }
 
+  @Test
+  def testListOffsetsWithNoTopicAccess() {
+    val e = intercept[TopicAuthorizationException] {
+      this.consumers.head.partitionsFor(topic);
+    }
+    assertEquals(Set(topic), e.unauthorizedTopics().asScala)
+  }
+
+  @Test
+  def testListOfsetsWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    this.consumers.head.partitionsFor(topic);
+  }
+
   def removeAllAcls() = {
     servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
       servers.head.apis.authorizer.get.removeAcls(resource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6fabfdc..90e9562 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -18,9 +18,9 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.mutable.Buffer
@@ -249,7 +249,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val parts = this.consumers(0).partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
-    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  @Test
+  def testPartitionsForAutoCreate() {
+    val partitions = this.consumers(0).partitionsFor("non-exist-topic")
+    assertFalse(partitions.isEmpty)
+  }
+
+  @Test(expected=classOf[InvalidTopicException])
+  def testPartitionsForInvalidTopic() {
+    this.consumers(0).partitionsFor(";3# ads,{234")
   }
 
   @Test