You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/10 19:20:37 UTC

[2/3] kafka git commit: KAFKA-1910; Refactor new consumer and fixed a bunch of corner cases / unit tests; reviewed by Onur Karaman and Jay Kreps

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
new file mode 100644
index 0000000..e972efb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.ConsumerMetadataRequest;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class manage the coordination process with the consumer coordinator.
+ */
+public final class Coordinator {
+
+    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
+
+    private final KafkaClient client;
+
+    private final Time time;
+    private final String groupId;
+    private final Metadata metadata;
+    private final Heartbeat heartbeat;
+    private final long sessionTimeoutMs;
+    private final String assignmentStrategy;
+    private final SubscriptionState subscriptions;
+    private final CoordinatorMetrics sensors;
+    private final long retryBackoffMs;
+    private Node consumerCoordinator;
+    private String consumerId;
+    private int generation;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public Coordinator(KafkaClient client,
+                       String groupId,
+                       long retryBackoffMs,
+                       long sessionTimeoutMs,
+                       String assignmentStrategy,
+                       Metadata metadata,
+                       SubscriptionState subscriptions,
+                       Metrics metrics,
+                       String metricGrpPrefix,
+                       Map<String, String> metricTags,
+                       Time time) {
+
+        this.time = time;
+        this.client = client;
+        this.generation = -1;
+        this.consumerId = "";
+        this.groupId = groupId;
+        this.metadata = metadata;
+        this.consumerCoordinator = null;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.assignmentStrategy = assignmentStrategy;
+        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+        this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+    }
+
+    /**
+     * Assign partitions for the subscribed topics.
+     *
+     * @param subscribedTopics The subscribed topics list
+     * @param now The current time
+     * @return The assigned partition info
+     */
+    public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) {
+
+        // send a join group request to the coordinator
+        log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
+
+        JoinGroupRequest request = new JoinGroupRequest(groupId,
+            (int) this.sessionTimeoutMs,
+            subscribedTopics,
+            this.consumerId,
+            this.assignmentStrategy);
+        ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
+
+        // process the response
+        JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
+        // TODO: needs to handle disconnects and errors
+        Errors.forCode(response.errorCode()).maybeThrow();
+        this.consumerId = response.consumerId();
+
+        // set the flag to refresh last committed offsets
+        this.subscriptions.needRefreshCommits();
+
+        log.debug("Joined group: {}", response);
+
+        // record re-assignment time
+        this.sensors.partitionReassignments.record(time.milliseconds() - now);
+
+        // return assigned partitions
+        return response.assignedPartitions();
+    }
+
+    /**
+     * Commit offsets for the specified list of topics and partitions.
+     *
+     * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+     * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
+     * the commit succeeds.
+     *
+     * @param offsets The list of offsets per partition that should be committed.
+     * @param blocking Control whether the commit is blocking
+     * @param now The current time
+     */
+    public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) {
+        if (!offsets.isEmpty()) {
+            // create the offset commit request
+            Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
+            offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
+            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, offsetData);
+
+            // send request and possibly wait for response if it is blocking
+            RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);
+
+            if (blocking) {
+                boolean done;
+                do {
+                    ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+
+                    // check for errors
+                    done = true;
+                    OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
+                    for (short errorCode : commitResponse.responseData().values()) {
+                        if (errorCode != Errors.NONE.code())
+                            done = false;
+                    }
+                    if (!done) {
+                        log.debug("Error in offset commit, backing off for {} ms before retrying again.",
+                            this.retryBackoffMs);
+                        Utils.sleep(this.retryBackoffMs);
+                    }
+                } while (!done);
+            } else {
+                this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
+            }
+        }
+    }
+
+    /**
+     * Fetch the committed offsets of the given set of partitions.
+     *
+     * @param partitions The list of partitions which need to ask for committed offsets
+     * @param now The current time
+     * @return The fetched offset values
+     */
+    public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) {
+        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+
+        while (true) {
+            // construct the request
+            OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+            // send the request and block on waiting for response
+            ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
+
+            // parse the response to get the offsets
+            boolean offsetsReady = true;
+            OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
+            // TODO: needs to handle disconnects
+            Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData data = entry.getValue();
+                if (data.hasError()) {
+                    log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+                        .exception()
+                        .getMessage());
+                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+                        // just retry
+                        offsetsReady = false;
+                        Utils.sleep(this.retryBackoffMs);
+                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                        // re-discover the coordinator and retry
+                        coordinatorDead();
+                        offsetsReady = false;
+                        Utils.sleep(this.retryBackoffMs);
+                    } else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code()
+                            || data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                        // just ignore this partition
+                        log.debug("No committed offset for partition " + tp);
+                    } else {
+                        throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset");
+                    }
+                } else if (data.offset >= 0) {
+                    // record the position with the offset (-1 seems to indicate no
+                    // such offset known)
+                    offsets.put(tp, data.offset);
+                } else {
+                    log.debug("No committed offset for partition " + tp);
+                }
+            }
+
+            if (offsetsReady)
+                return offsets;
+        }
+    }
+
+    /**
+     * Attempt to heartbeat the consumer coordinator if necessary, and check if the coordinator is still alive.
+     *
+     * @param now The current time
+     */
+    public void maybeHeartbeat(long now) {
+        if (heartbeat.shouldHeartbeat(now)) {
+            HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
+            this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now));
+            this.heartbeat.sentHeartbeat(now);
+        }
+    }
+
+    public boolean coordinatorUnknown() {
+        return this.consumerCoordinator == null;
+    }
+
+    /**
+     * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
+     * disconnected). Note that this means any requests sent this way must be idempotent.
+     *
+     * @return The response
+     */
+    private ClientResponse blockingCoordinatorRequest(ApiKeys api,
+                                                      Struct request,
+                                                      RequestCompletionHandler handler,
+                                                      long now) {
+        while (true) {
+            ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now);
+            ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
+            if (coordinatorResponse.wasDisconnected()) {
+                handleCoordinatorDisconnect(coordinatorResponse);
+                Utils.sleep(this.retryBackoffMs);
+            } else {
+                return coordinatorResponse;
+            }
+        }
+    }
+
+    /**
+     * Ensure the consumer coordinator is known and we have a ready connection to it.
+     */
+    private void ensureCoordinatorReady() {
+        while (true) {
+            if (this.consumerCoordinator == null)
+                discoverCoordinator();
+
+            while (true) {
+                boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
+                if (ready) {
+                    return;
+                } else {
+                    log.debug("No connection to coordinator, attempting to connect.");
+                    this.client.poll(this.retryBackoffMs, time.milliseconds());
+
+                    // if the coordinator connection has failed, we need to
+                    // break the inner loop to re-discover the coordinator
+                    if (this.client.connectionFailed(this.consumerCoordinator)) {
+                        log.debug("Coordinator connection failed. Attempting to re-discover.");
+                        coordinatorDead();
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Mark the current coordinator as dead.
+     */
+    private void coordinatorDead() {
+        if (this.consumerCoordinator != null) {
+            log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
+            this.consumerCoordinator = null;
+        }
+    }
+
+    /**
+     * Keep discovering the consumer coordinator until it is found.
+     */
+    private void discoverCoordinator() {
+        while (this.consumerCoordinator == null) {
+            log.debug("No coordinator known, attempting to discover one.");
+            Node coordinator = fetchConsumerCoordinator();
+
+            if (coordinator == null) {
+                log.debug("No coordinator found, backing off.");
+                Utils.sleep(this.retryBackoffMs);
+            } else {
+                log.debug("Found coordinator: " + coordinator);
+                this.consumerCoordinator = coordinator;
+            }
+        }
+    }
+
+    /**
+     * Get the current consumer coordinator information via consumer metadata request.
+     *
+     * @return the consumer coordinator node
+     */
+    private Node fetchConsumerCoordinator() {
+
+        // initiate the consumer metadata request
+        ClientRequest request = initiateConsumerMetadataRequest();
+
+        // send the request and wait for its response
+        ClientResponse response = sendAndReceive(request, request.createdTime());
+
+        // parse the response to get the coordinator info if it is not disconnected,
+        // otherwise we need to request metadata update
+        if (!response.wasDisconnected()) {
+            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
+            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
+            // for the coordinator in the underlying network client layer
+            // TODO: this needs to be better handled in KAFKA-1935
+            if (consumerMetadataResponse.errorCode() == Errors.NONE.code())
+                return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
+                    consumerMetadataResponse.node().host(),
+                    consumerMetadataResponse.node().port());
+        } else {
+            this.metadata.requestUpdate();
+        }
+
+        return null;
+    }
+
+    /**
+     * Handle the case when the request gets cancelled due to coordinator disconnection.
+     */
+    private void handleCoordinatorDisconnect(ClientResponse response) {
+        int correlation = response.request().request().header().correlationId();
+        log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+            response.request(),
+            correlation,
+            response.request().request().destination());
+
+        // mark the coordinator as dead
+        coordinatorDead();
+    }
+
+    /**
+     * Initiate a consumer metadata request to the least loaded node.
+     *
+     * @return The created request
+     */
+    private ClientRequest initiateConsumerMetadataRequest() {
+
+        // find a node to ask about the coordinator
+        Node node = this.client.leastLoadedNode(time.milliseconds());
+        while (node == null || !this.client.ready(node, time.milliseconds())) {
+            long now = time.milliseconds();
+            this.client.poll(this.retryBackoffMs, now);
+            node = this.client.leastLoadedNode(now);
+
+            // if there is no ready node, backoff before retry
+            if (node == null)
+                Utils.sleep(this.retryBackoffMs);
+        }
+
+        // create a consumer metadata request
+        log.debug("Issuing consumer metadata request to broker {}", node.id());
+
+        ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId);
+        RequestSend send = new RequestSend(node.id(),
+            this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
+            request.toStruct());
+        long now = time.milliseconds();
+        return new ClientRequest(now, true, send, null);
+    }
+
+    /**
+     * Initiate a request to the coordinator.
+     */
+    private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
+
+        // first make sure the coordinator is known and ready
+        ensureCoordinatorReady();
+
+        // create the request for the coordinator
+        log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id());
+
+        RequestHeader header = this.client.nextRequestHeader(api);
+        RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
+        return new ClientRequest(now, true, send, handler);
+    }
+
+    /**
+     * Attempt to send a request and receive its response.
+     *
+     * @return The response
+     */
+    private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) {
+
+        // send the request
+        this.client.send(clientRequest);
+
+        // drain all responses from the destination node
+        List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now);
+        if (responses.isEmpty()) {
+            throw new IllegalStateException("This should not happen.");
+        } else {
+            // other requests should be handled by the callback, and
+            // we only care about the response of the last request
+            return responses.get(responses.size() - 1);
+        }
+    }
+
+    private class HeartbeatCompletionHandler implements RequestCompletionHandler {
+        @Override
+        public void onComplete(ClientResponse resp) {
+            if (resp.wasDisconnected()) {
+                handleCoordinatorDisconnect(resp);
+            } else {
+                HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
+                if (response.errorCode() == Errors.NONE.code()) {
+                    log.debug("Received successful heartbeat response.");
+                } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                    || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                    coordinatorDead();
+                } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
+                    subscriptions.needReassignment();
+                } else {
+                    throw new KafkaException("Unexpected error in heartbeat response: "
+                        + Errors.forCode(response.errorCode()).exception().getMessage());
+                }
+            }
+            sensors.heartbeatLatency.record(resp.requestLatencyMs());
+        }
+    }
+
+    private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
+
+        private final Map<TopicPartition, Long> offsets;
+
+        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) {
+            this.offsets = offsets;
+        }
+
+        @Override
+        public void onComplete(ClientResponse resp) {
+            if (resp.wasDisconnected()) {
+                handleCoordinatorDisconnect(resp);
+            } else {
+                OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
+                for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
+                    TopicPartition tp = entry.getKey();
+                    short errorCode = entry.getValue();
+                    long offset = this.offsets.get(tp);
+                    if (errorCode == Errors.NONE.code()) {
+                        log.debug("Committed offset {} for partition {}", offset, tp);
+                        subscriptions.committed(tp, offset);
+                    } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                        || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                        coordinatorDead();
+                    } else {
+                        log.error("Error committing partition {} at offset {}: {}",
+                            tp,
+                            offset,
+                            Errors.forCode(errorCode).exception().getMessage());
+                    }
+                }
+            }
+            sensors.commitLatency.record(resp.requestLatencyMs());
+        }
+    }
+
+    private class CoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public final Sensor commitLatency;
+        public final Sensor heartbeatLatency;
+        public final Sensor partitionReassignments;
+
+        public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            this.commitLatency = metrics.sensor("commit-latency");
+            this.commitLatency.add(new MetricName("commit-latency-avg",
+                this.metricGrpName,
+                "The average time taken for a commit request",
+                tags), new Avg());
+            this.commitLatency.add(new MetricName("commit-latency-max",
+                this.metricGrpName,
+                "The max time taken for a commit request",
+                tags), new Max());
+            this.commitLatency.add(new MetricName("commit-rate",
+                this.metricGrpName,
+                "The number of commit calls per second",
+                tags), new Rate(new Count()));
+
+            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+                this.metricGrpName,
+                "The max time taken to receive a response to a hearbeat request",
+                tags), new Max());
+            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+                this.metricGrpName,
+                "The average number of heartbeats per second",
+                tags), new Rate(new Count()));
+
+            this.partitionReassignments = metrics.sensor("reassignment-latency");
+            this.partitionReassignments.add(new MetricName("reassignment-time-avg",
+                this.metricGrpName,
+                "The average time taken for a partition reassignment",
+                tags), new Avg());
+            this.partitionReassignments.add(new MetricName("reassignment-time-max",
+                this.metricGrpName,
+                "The max time taken for a partition reassignment",
+                tags), new Avg());
+            this.partitionReassignments.add(new MetricName("reassignment-rate",
+                this.metricGrpName,
+                "The number of partition reassignments per second",
+                tags), new Rate(new Count()));
+
+            Measurable numParts =
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return subscriptions.assignedPartitions().size();
+                    }
+                };
+            metrics.addMetric(new MetricName("assigned-partitions",
+                this.metricGrpName,
+                "The number of partitions currently assigned to this consumer",
+                tags),
+                numParts);
+
+            Measurable lastHeartbeat =
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+                    }
+                };
+            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+                this.metricGrpName,
+                "The number of seconds since the last controller heartbeat",
+                tags),
+                lastHeartbeat);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
new file mode 100644
index 0000000..27c78b8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This class manage the fetching process with the brokers.
+ */
+public class Fetcher<K, V> {
+
+    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
+    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+
+
+    private final KafkaClient client;
+
+    private final Time time;
+    private final int minBytes;
+    private final int maxWaitMs;
+    private final int fetchSize;
+    private final boolean checkCrcs;
+    private final long retryBackoffMs;
+    private final Metadata metadata;
+    private final FetchManagerMetrics sensors;
+    private final SubscriptionState subscriptions;
+    private final List<PartitionRecords<K, V>> records;
+    private final AutoOffsetResetStrategy offsetResetStrategy;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
+
+    public Fetcher(KafkaClient client,
+                   long retryBackoffMs,
+                   int minBytes,
+                   int maxWaitMs,
+                   int fetchSize,
+                   boolean checkCrcs,
+                   String offsetReset,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   Metadata metadata,
+                   SubscriptionState subscriptions,
+                   Metrics metrics,
+                   String metricGrpPrefix,
+                   Map<String, String> metricTags,
+                   Time time) {
+
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.minBytes = minBytes;
+        this.maxWaitMs = maxWaitMs;
+        this.fetchSize = fetchSize;
+        this.checkCrcs = checkCrcs;
+        this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset);
+
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+
+        this.records = new LinkedList<PartitionRecords<K, V>>();
+        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+    }
+
+    /**
+     * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
+     *
+     * @param cluster The current cluster metadata
+     * @param now The current time
+     */
+    public void initFetches(Cluster cluster, long now) {
+        for (ClientRequest request : createFetchRequests(cluster)) {
+            Node node = cluster.nodeById(request.request().destination());
+            if (client.ready(node, now)) {
+                log.trace("Initiating fetch to node {}: {}", node.id(), request);
+                client.send(request);
+            }
+        }
+    }
+
+    /**
+     * Return the fetched records, empty the record buffer and update the consumed position.
+     *
+     * @return The fetched records per partition
+     */
+    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
+        if (this.subscriptions.partitionAssignmentNeeded()) {
+            return Collections.emptyMap();
+        } else {
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+            for (PartitionRecords<K, V> part : this.records) {
+                Long consumed = subscriptions.consumed(part.partition);
+                if (this.subscriptions.assignedPartitions().contains(part.partition)
+                    && (consumed == null || part.fetchOffset == consumed)) {
+                    List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+                    if (records == null) {
+                        records = part.records;
+                        drained.put(part.partition, records);
+                    } else {
+                        records.addAll(part.records);
+                    }
+                    subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+                } else {
+                    // these records aren't next in line based on the last consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
+                }
+            }
+            this.records.clear();
+            return drained;
+        }
+    }
+
+    /**
+     * Reset offsets for the given partition using the offset reset strategy.
+     *
+     * @param partition The given partition that needs reset offset
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     */
+    public void resetOffset(TopicPartition partition) {
+        long timestamp;
+        if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
+            timestamp = EARLIEST_OFFSET_TIMESTAMP;
+        else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
+            timestamp = LATEST_OFFSET_TIMESTAMP;
+        else
+            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+        log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
+            .toLowerCase());
+        this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
+    }
+
+    /**
+     * Fetch a single offset before the given timestamp for the partition.
+     *
+     * @param topicPartition The partition that needs fetching offset.
+     * @param timestamp The timestamp for fetching offset.
+     * @return The offset of the message that is published before the given timestamp
+     */
+    public long offsetBefore(TopicPartition topicPartition, long timestamp) {
+        log.debug("Fetching offsets for partition {}.", topicPartition);
+        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
+        partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
+        while (true) {
+            long now = time.milliseconds();
+            PartitionInfo info = metadata.fetch().partition(topicPartition);
+            if (info == null) {
+                metadata.add(topicPartition.topic());
+                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
+                awaitMetadataUpdate();
+            } else if (info.leader() == null) {
+                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
+                awaitMetadataUpdate();
+            } else if (this.client.ready(info.leader(), now)) {
+                Node node = info.leader();
+                ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+                RequestSend send = new RequestSend(node.id(),
+                    this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
+                    request.toStruct());
+                ClientRequest clientRequest = new ClientRequest(now, true, send, null);
+                this.client.send(clientRequest);
+                List<ClientResponse> responses = this.client.completeAll(node.id(), now);
+                if (responses.isEmpty())
+                    throw new IllegalStateException("This should not happen.");
+                ClientResponse response = responses.get(responses.size() - 1);
+                if (response.wasDisconnected()) {
+                    awaitMetadataUpdate();
+                } else {
+                    ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
+                    short errorCode = lor.responseData().get(topicPartition).errorCode;
+                    if (errorCode == Errors.NONE.code()) {
+                        List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+                        if (offsets.size() != 1)
+                            throw new IllegalStateException("This should not happen.");
+                        long offset = offsets.get(0);
+                        log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+                        return offset;
+                    } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                        || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+                        log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+                            topicPartition);
+                        awaitMetadataUpdate();
+                    } else {
+                        Errors.forCode(errorCode).maybeThrow();
+                    }
+                }
+            } else {
+                log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition);
+                client.poll(this.retryBackoffMs, now);
+            }
+        }
+    }
+
+    /**
+     * Create fetch requests for all nodes for which we have assigned partitions
+     * that have no existing requests in flight.
+     */
+    private List<ClientRequest> createFetchRequests(Cluster cluster) {
+        // create the fetch info
+        Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
+        for (TopicPartition partition : subscriptions.assignedPartitions()) {
+            Node node = cluster.leaderFor(partition);
+            // if there is a leader and no in-flight requests, issue a new fetch
+            if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
+                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+                if (fetch == null) {
+                    fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+                    fetchable.put(node.id(), fetch);
+                }
+                long offset = this.subscriptions.fetched(partition);
+                fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
+            }
+        }
+
+        // create the requests
+        List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
+        for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+            int nodeId = entry.getKey();
+            final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+            RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
+            RequestCompletionHandler handler = new RequestCompletionHandler() {
+                public void onComplete(ClientResponse response) {
+                    handleFetchResponse(response, fetch);
+                }
+            };
+            requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
+        }
+        return requests;
+    }
+
+    /**
+     * The callback for fetch completion
+     */
+    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
+        if (resp.wasDisconnected()) {
+            int correlation = resp.request().request().header().correlationId();
+            log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
+                resp.request(), correlation, resp.request().request().destination());
+        } else {
+            int totalBytes = 0;
+            int totalCount = 0;
+            FetchResponse response = new FetchResponse(resp.responseBody());
+            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+                TopicPartition tp = entry.getKey();
+                FetchResponse.PartitionData partition = entry.getValue();
+                if (!subscriptions.assignedPartitions().contains(tp)) {
+                    log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+                } else if (partition.errorCode == Errors.NONE.code()) {
+                    int bytes = 0;
+                    ByteBuffer buffer = partition.recordSet;
+                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
+                    long fetchOffset = request.fetchData().get(tp).offset;
+                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+                    for (LogEntry logEntry : records) {
+                        parsed.add(parseRecord(tp, logEntry));
+                        bytes += logEntry.size();
+                    }
+                    if (parsed.size() > 0) {
+                        ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
+                        this.subscriptions.fetched(tp, record.offset() + 1);
+                        this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
+                        this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+                    }
+                    this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+                    totalBytes += bytes;
+                    totalCount += parsed.size();
+                } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                    || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                    this.metadata.requestUpdate();
+                } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+                    // TODO: this could be optimized by grouping all out-of-range partitions
+                    log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+                    resetOffset(tp);
+                } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+                    log.warn("Unknown error fetching data for topic-partition {}", tp);
+                } else {
+                    throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+                }
+            }
+            this.sensors.bytesFetched.record(totalBytes);
+            this.sensors.recordsFetched.record(totalCount);
+        }
+        this.sensors.fetchLatency.record(resp.requestLatencyMs());
+    }
+
+    /**
+     * Parse the record entry, deserializing the key / value fields if necessary
+     */
+    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
+        if (this.checkCrcs)
+            logEntry.record().ensureValid();
+
+        long offset = logEntry.offset();
+        ByteBuffer keyBytes = logEntry.record().key();
+        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+        ByteBuffer valueBytes = logEntry.record().value();
+        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+
+        return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
+    }
+
+    /*
+     * Request a metadata update and wait until it has occurred
+     */
+    private void awaitMetadataUpdate() {
+        int version = this.metadata.requestUpdate();
+        do {
+            long now = time.milliseconds();
+            this.client.poll(this.retryBackoffMs, now);
+        } while (this.metadata.version() == version);
+    }
+
+    private static class PartitionRecords<K, V> {
+        public long fetchOffset;
+        public TopicPartition partition;
+        public List<ConsumerRecord<K, V>> records;
+
+        public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
+            this.fetchOffset = fetchOffset;
+            this.partition = partition;
+            this.records = records;
+        }
+    }
+
+    private static enum AutoOffsetResetStrategy {
+        LATEST, EARLIEST, NONE
+    }
+
+    private class FetchManagerMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public final Sensor bytesFetched;
+        public final Sensor recordsFetched;
+        public final Sensor fetchLatency;
+        public final Sensor recordsFetchLag;
+
+
+        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
+
+            this.bytesFetched = metrics.sensor("bytes-fetched");
+            this.bytesFetched.add(new MetricName("fetch-size-avg",
+                this.metricGrpName,
+                "The average number of bytes fetched per request",
+                tags), new Avg());
+            this.bytesFetched.add(new MetricName("fetch-size-max",
+                this.metricGrpName,
+                "The maximum number of bytes fetched per request",
+                tags), new Max());
+            this.bytesFetched.add(new MetricName("bytes-consumed-rate",
+                this.metricGrpName,
+                "The average number of bytes consumed per second",
+                tags), new Rate());
+
+            this.recordsFetched = metrics.sensor("records-fetched");
+            this.recordsFetched.add(new MetricName("records-per-request-avg",
+                this.metricGrpName,
+                "The average number of records in each request",
+                tags), new Avg());
+            this.recordsFetched.add(new MetricName("records-consumed-rate",
+                this.metricGrpName,
+                "The average number of records consumed per second",
+                tags), new Rate());
+
+            this.fetchLatency = metrics.sensor("fetch-latency");
+            this.fetchLatency.add(new MetricName("fetch-latency-avg",
+                this.metricGrpName,
+                "The average time taken for a fetch request.",
+                tags), new Avg());
+            this.fetchLatency.add(new MetricName("fetch-latency-max",
+                this.metricGrpName,
+                "The max time taken for any fetch request.",
+                tags), new Max());
+            this.fetchLatency.add(new MetricName("fetch-rate",
+                this.metricGrpName,
+                "The number of fetch requests per second.",
+                tags), new Rate(new Count()));
+
+            this.recordsFetchLag = metrics.sensor("records-lag");
+            this.recordsFetchLag.add(new MetricName("records-lag-max",
+                this.metricGrpName,
+                "The maximum lag in terms of number of records for any partition in this window",
+                tags), new Max());
+        }
+
+        public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+            // record bytes fetched
+            String name = "topic." + topic + ".bytes-fetched";
+            Sensor bytesFetched = this.metrics.getSensor(name);
+            if (bytesFetched == null)
+                bytesFetched = this.metrics.sensor(name);
+            bytesFetched.record(bytes);
+
+            // record records fetched
+            name = "topic." + topic + ".records-fetched";
+            Sensor recordsFetched = this.metrics.getSensor(name);
+            if (recordsFetched == null)
+                recordsFetched = this.metrics.sensor(name);
+            recordsFetched.record(records);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index ee0751e..e7cfaaa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -21,34 +21,20 @@ public final class Heartbeat {
      * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
      * once per second.
      */
-    private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+    public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
 
     private final long timeout;
     private long lastHeartbeatSend;
-    private long lastHeartbeatResponse;
 
     public Heartbeat(long timeout, long now) {
         this.timeout = timeout;
         this.lastHeartbeatSend = now;
-        this.lastHeartbeatResponse = now;
     }
 
     public void sentHeartbeat(long now) {
         this.lastHeartbeatSend = now;
     }
 
-    public void receivedResponse(long now) {
-        this.lastHeartbeatResponse = now;
-    }
-
-    public void markDead() {
-        this.lastHeartbeatResponse = -1;
-    }
-
-    public boolean isAlive(long now) {
-        return now - lastHeartbeatResponse <= timeout;
-    }
-
     public boolean shouldHeartbeat(long now) {
         return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index d41d306..cee7541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -43,9 +43,12 @@ public class SubscriptionState {
     /* the last committed offset for each partition */
     private final Map<TopicPartition, Long> committed;
 
-    /* do we need to request a partition assignment from the co-ordinator? */
+    /* do we need to request a partition assignment from the coordinator? */
     private boolean needsPartitionAssignment;
 
+    /* do we need to request the latest committed offsets from the coordinator? */
+    private boolean needsFetchCommittedOffsets;
+
     public SubscriptionState() {
         this.subscribedTopics = new HashSet<String>();
         this.subscribedPartitions = new HashSet<TopicPartition>();
@@ -54,6 +57,7 @@ public class SubscriptionState {
         this.fetched = new HashMap<TopicPartition, Long>();
         this.committed = new HashMap<TopicPartition, Long>();
         this.needsPartitionAssignment = false;
+        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
     }
 
     public void subscribe(String topic) {
@@ -75,6 +79,10 @@ public class SubscriptionState {
                 clearPartition(tp);
     }
 
+    public void needReassignment() {
+        this.needsPartitionAssignment = true;
+    }
+
     public void subscribe(TopicPartition tp) {
         if (this.subscribedTopics.size() > 0)
             throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
@@ -119,11 +127,20 @@ public class SubscriptionState {
 
     public void committed(TopicPartition tp, long offset) {
         this.committed.put(tp, offset);
+        this.needsFetchCommittedOffsets = false;
     }
 
     public Long committed(TopicPartition tp) {
         return this.committed.get(tp);
     }
+
+    public void needRefreshCommits() {
+        this.needsFetchCommittedOffsets = true;
+    }
+
+    public boolean refreshCommitsNeeded() {
+        return this.needsFetchCommittedOffsets;
+    }
     
     public void seek(TopicPartition tp, long offset) {
         fetched(tp, offset);
@@ -162,7 +179,7 @@ public class SubscriptionState {
         return copy;
     }
 
-    public boolean needsPartitionAssignment() {
+    public boolean partitionAssignmentNeeded() {
         return this.needsPartitionAssignment;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7397e56..feda9c9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -162,21 +162,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *                         be called in the producer when the serializer is passed in directly.
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)),
+        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
              keySerializer, valueSerializer);
     }
 
-    private static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
-                                                      Serializer<?> keySerializer, Serializer<?> valueSerializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
-        newConfigs.putAll(configs);
-        if (keySerializer != null)
-            newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
-        if (valueSerializer != null)
-            newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
-        return newConfigs;
-    }
-
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
@@ -196,21 +185,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *                         be called in the producer when the serializer is passed in directly.
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)),
+        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
              keySerializer, valueSerializer);
     }
 
-    private static Properties addSerializerToConfig(Properties properties,
-                                                    Serializer<?> keySerializer, Serializer<?> valueSerializer) {
-        Properties newProperties = new Properties();
-        newProperties.putAll(properties);
-        if (keySerializer != null)
-            newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
-        if (valueSerializer != null)
-            newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
-        return newProperties;
-    }
-
     @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         log.trace("Starting the Kafka producer");

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 122375c..fa9daae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,13 +16,16 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Serializer;
 
 /**
  * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
@@ -166,6 +169,7 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
 
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -217,6 +221,28 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
     }
 
+    public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
+                                                            Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        newConfigs.putAll(configs);
+        if (keySerializer != null)
+            newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
+        if (valueSerializer != null)
+            newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
+        return newConfigs;
+    }
+
+    public static Properties addSerializerToConfig(Properties properties,
+                                                   Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+        Properties newProperties = new Properties();
+        newProperties.putAll(properties);
+        if (keySerializer != null)
+            newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
+        if (valueSerializer != null)
+            newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
+        return newProperties;
+    }
+
     ProducerConfig(Map<? extends Object, ? extends Object> props) {
         super(CONFIG, props);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ed9c63a..03df9ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -298,9 +298,8 @@ public class Sender implements Runnable {
         final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            ByteBuffer recordsBuffer = batch.records.buffer();
-            recordsBuffer.flip();
-            produceRecordsByPartition.put(tp, recordsBuffer);
+            batch.records.rewind();
+            produceRecordsByPartition.put(tp, batch.records.buffer());
             recordsByPartition.put(tp, batch);
         }
         ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 6baad93..57de058 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.network;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.CancelledKeyException;
@@ -90,7 +91,7 @@ public class Selector implements Selectable {
     /**
      * Create a new selector
      */
-    public Selector(Metrics metrics, Time time , String metricGrpPrefix , Map<String, String> metricTags) {
+    public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
         try {
             this.selector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -274,7 +275,7 @@ public class Selector implements Selectable {
                     }
                 } catch (IOException e) {
                     String desc = socketDescription(channel);
-                    if (e instanceof EOFException)
+                    if (e instanceof EOFException || e instanceof ConnectException)
                         log.info("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ad2171f..ce18a6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,14 +41,23 @@ public enum Errors {
             new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
     NOT_LEADER_FOR_PARTITION(6,
             new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
+    REQUEST_TIMED_OUT(7,
+            new TimeoutException("The request timed out.")),
+    // TODO: errorCode 8 for BrokerNotAvailable
+    REPLICA_NOT_AVAILABLE(9,
+            new ApiException("The replica is not available for the requested topic-partition")),
     MESSAGE_TOO_LARGE(10,
             new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
-    OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")),
-    CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")),
-    NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct co-ordinator for this consumer.")),
+    OFFSET_METADATA_TOO_LARGE(12,
+            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
+    NETWORK_EXCEPTION(13,
+            new NetworkException("The server disconnected before a response was received.")),
+    OFFSET_LOAD_IN_PROGRESS(14,
+            new ApiException("The coordinator is loading offsets and can't process requests.")),
+    CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
+            new ApiException("The coordinator is not available.")),
+    NOT_COORDINATOR_FOR_CONSUMER(16,
+            new ApiException("This is not the correct co-ordinator for this consumer.")),
     INVALID_TOPIC_EXCEPTION(17,
             new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
     RECORD_LIST_TOO_LARGE(18,
@@ -57,7 +66,12 @@ public enum Errors {
             new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
     NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
             new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
-    INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks."));
+    INVALID_REQUIRED_ACKS(21,
+            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
+    ILLEGAL_GENERATION(22,
+            new ApiException("Specified consumer generation id is not valid.")),
+    NO_OFFSETS_FETCHABLE(23,
+            new ApiException("No offsets have been committed so far."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
@@ -68,7 +82,6 @@ public enum Errors {
             if (error.exception != null)
                 classToError.put(error.exception.getClass(), error);
         }
-
     }
 
     private final short code;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 083e7a3..a412f61 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -120,6 +120,16 @@ public class MemoryRecords implements Records {
         buffer = compressor.buffer();
     }
 
+    /**
+     * Rewind the writable records to read mode
+     */
+    public void rewind() {
+        if (writable)
+            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+
+        buffer.flip();
+    }
+
     /** Write the records in this set to the given channel */
     public int writeTo(GatheringByteChannel channel) throws IOException {
         return channel.write(buffer);
@@ -158,7 +168,7 @@ public class MemoryRecords implements Records {
 
     @Override
     public Iterator<LogEntry> iterator() {
-        ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
+        ByteBuffer copy = this.buffer.duplicate();
         return new RecordsIterator(copy, CompressionType.NONE, false);
     }
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e67c4c8..f020aaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -41,6 +41,17 @@ public class FetchResponse extends AbstractRequestResponse {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     *  OFFSET_OUT_OF_RANGE (1)
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  NOT_LEADER_FOR_PARTITION (6)
+     *  REPLICA_NOT_AVAILABLE (9)
+     *  UNKNOWN (-1)
+     */
+
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 0057496..f548cd0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,6 +24,12 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private final short errorCode;
     public HeartbeatResponse(short errorCode) {
         super(new Struct(CURRENT_SCHEMA));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 8c50e9b..1ebc188 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -30,6 +30,8 @@ public class JoinGroupRequest extends AbstractRequestResponse {
     private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
     private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
 
+    public static final String UNKNOWN_CONSUMER_ID = "";
+
     private final String groupId;
     private final int sessionTimeout;
     private final List<String> topics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 52b1803..fd9c545 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -26,6 +26,13 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
     private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
     private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index cfac47a..af704f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -41,6 +41,13 @@ public class ListOffsetResponse extends AbstractRequestResponse {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private static final String OFFSETS_KEY_NAME = "offsets";
 
     private final Map<TopicPartition, PartitionData> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 90f3141..36736ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -40,11 +40,25 @@ public class MetadataResponse extends AbstractRequestResponse {
 
     // topic level field names
     private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
 
     // partition level field names
     private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private static final String PARTITION_KEY_NAME = "partition_id";
     private static final String LEADER_KEY_NAME = "leader";
     private static final String REPLICAS_KEY_NAME = "replicas";

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 4d3b9ec..70844d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -38,6 +38,12 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private final Map<TopicPartition, Short> responseData;
 
     public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index edbed58..f10c246 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -41,6 +41,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     private static final String METADATA_KEY_NAME = "metadata";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    /**
+     * Possible error code:
+     *
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  OFFSET_LOAD_IN_PROGRESS (14)
+     *  NOT_COORDINATOR_FOR_CONSUMER (16)
+     *  NO_OFFSETS_FETCHABLE (23)
+     */
+
     private final Map<TopicPartition, PartitionData> responseData;
 
     public static final class PartitionData {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index a00dcdf..4b67f70 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -37,6 +37,13 @@ public class ProduceResponse extends AbstractRequestResponse {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
 
     private final Map<TopicPartition, PartitionResponse> responses;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8f1a7a6..5e3fab1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -35,11 +35,23 @@ import org.apache.kafka.common.utils.Time;
  */
 public class MockClient implements KafkaClient {
 
+    private class FutureResponse {
+        public final Struct responseBody;
+        public final boolean disconnected;
+
+        public FutureResponse(Struct responseBody, boolean disconnected) {
+            this.responseBody = responseBody;
+            this.disconnected = disconnected;
+        }
+    }
+
     private final Time time;
     private int correlation = 0;
+    private Node node = null;
     private final Set<Integer> ready = new HashSet<Integer>();
     private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
     private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
+    private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
 
     public MockClient(Time time) {
         this.time = time;
@@ -52,9 +64,8 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean ready(Node node, long now) {
-        boolean found = isReady(node, now);
         ready.add(node.id());
-        return found;
+        return true;
     }
 
     @Override
@@ -62,6 +73,11 @@ public class MockClient implements KafkaClient {
         return 0;
     }
 
+    @Override
+    public boolean connectionFailed(Node node) {
+        return false;
+    }
+
     public void disconnect(Integer node) {
         Iterator<ClientRequest> iter = requests.iterator();
         while (iter.hasNext()) {
@@ -76,16 +92,25 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void send(ClientRequest request) {
-        this.requests.add(request);
+        if (!futureResponses.isEmpty()) {
+            FutureResponse futureResp = futureResponses.poll();
+            ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
+            responses.add(resp);
+        } else {
+            this.requests.add(request);
+        }
     }
 
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
-        for (ClientResponse response: this.responses)
-            if (response.request().hasCallback()) 
+        List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+
+        while (!this.responses.isEmpty()) {
+            ClientResponse response = this.responses.poll();
+            if (response.request().hasCallback())
                 response.request().callback().onComplete(response);
-        List<ClientResponse> copy = new ArrayList<ClientResponse>();
-        this.responses.clear();
+        }
+
         return copy;
     }
 
@@ -107,8 +132,24 @@ public class MockClient implements KafkaClient {
     }
 
     public void respond(Struct body) {
+        respond(body, false);
+    }
+
+    public void respond(Struct body, boolean disconnected) {
         ClientRequest request = requests.remove();
-        responses.add(new ClientResponse(request, time.milliseconds(), false, body));
+        responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+    }
+
+    public void prepareResponse(Struct body) {
+        prepareResponse(body, false);
+    }
+
+    public void prepareResponse(Struct body, boolean disconnected) {
+        futureResponses.add(new FutureResponse(body, disconnected));
+    }
+
+    public void setNode(Node node) {
+        this.node = node;
     }
 
     @Override
@@ -136,7 +177,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public Node leastLoadedNode(long now) {
-        return null;
+        return this.node;
     }
 
 }