You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/23 06:19:53 UTC

[1/2] kafka git commit: kafka-2168; New consumer poll() can block other calls like position(), commit(), and close() indefinitely; patched by Jason Gustafson; reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 2270a7537 -> b6d326b08


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 56281ee..695eaf6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,6 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -61,9 +60,6 @@ import java.util.Map;
 public class Fetcher<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
-    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-
 
     private final KafkaClient client;
 
@@ -72,23 +68,19 @@ public class Fetcher<K, V> {
     private final int maxWaitMs;
     private final int fetchSize;
     private final boolean checkCrcs;
-    private final long retryBackoffMs;
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final List<PartitionRecords<K, V>> records;
-    private final AutoOffsetResetStrategy offsetResetStrategy;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
 
     public Fetcher(KafkaClient client,
-                   long retryBackoffMs,
                    int minBytes,
                    int maxWaitMs,
                    int fetchSize,
                    boolean checkCrcs,
-                   String offsetReset,
                    Deserializer<K> keyDeserializer,
                    Deserializer<V> valueDeserializer,
                    Metadata metadata,
@@ -102,17 +94,16 @@ public class Fetcher<K, V> {
         this.client = client;
         this.metadata = metadata;
         this.subscriptions = subscriptions;
-        this.retryBackoffMs = retryBackoffMs;
         this.minBytes = minBytes;
         this.maxWaitMs = maxWaitMs;
         this.fetchSize = fetchSize;
         this.checkCrcs = checkCrcs;
-        this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset);
 
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
+
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
     }
 
@@ -166,84 +157,76 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * Reset offsets for the given partition using the offset reset strategy.
-     *
-     * @param partition The given partition that needs reset offset
-     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
-     */
-    public void resetOffset(TopicPartition partition) {
-        long timestamp;
-        if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
-        else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
-        log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
-            .toLowerCase());
-        this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
-    }
-
-    /**
      * Fetch a single offset before the given timestamp for the partition.
      *
      * @param topicPartition The partition that needs fetching offset.
      * @param timestamp The timestamp for fetching offset.
-     * @return The offset of the message that is published before the given timestamp
+     * @return A response which can be polled to obtain the corresponding offset.
      */
-    public long offsetBefore(TopicPartition topicPartition, long timestamp) {
-        log.debug("Fetching offsets for partition {}.", topicPartition);
+    public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) {
         Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
         partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
-        while (true) {
-            long now = time.milliseconds();
-            PartitionInfo info = metadata.fetch().partition(topicPartition);
-            if (info == null) {
-                metadata.add(topicPartition.topic());
-                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
-                awaitMetadataUpdate();
-            } else if (info.leader() == null) {
-                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
-                awaitMetadataUpdate();
-            } else if (this.client.ready(info.leader(), now)) {
-                Node node = info.leader();
-                ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
-                RequestSend send = new RequestSend(node.idString(),
+        long now = time.milliseconds();
+        PartitionInfo info = metadata.fetch().partition(topicPartition);
+        if (info == null) {
+            metadata.add(topicPartition.topic());
+            log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
+            return RequestFuture.metadataRefreshNeeded();
+        } else if (info.leader() == null) {
+            log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
+            return RequestFuture.metadataRefreshNeeded();
+        } else if (this.client.ready(info.leader(), now)) {
+            final RequestFuture<Long> future = new RequestFuture<Long>();
+            Node node = info.leader();
+            ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+            RequestSend send = new RequestSend(node.idString(),
                     this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
                     request.toStruct());
-                ClientRequest clientRequest = new ClientRequest(now, true, send, null);
-                this.client.send(clientRequest);
-                List<ClientResponse> responses = this.client.completeAll(node.idString(), now);
-                if (responses.isEmpty())
-                    throw new IllegalStateException("This should not happen.");
-                ClientResponse response = responses.get(responses.size() - 1);
-                if (response.wasDisconnected()) {
-                    awaitMetadataUpdate();
-                } else {
-                    ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
-                    short errorCode = lor.responseData().get(topicPartition).errorCode;
-                    if (errorCode == Errors.NONE.code()) {
-                        List<Long> offsets = lor.responseData().get(topicPartition).offsets;
-                        if (offsets.size() != 1)
-                            throw new IllegalStateException("This should not happen.");
-                        long offset = offsets.get(0);
-                        log.debug("Fetched offset {} for partition {}", offset, topicPartition);
-                        return offset;
-                    } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                            || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                        log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
-                            topicPartition);
-                        awaitMetadataUpdate();
-                    } else {
-                        log.error("Attempt to fetch offsets for partition {} failed due to: {}",
-                            topicPartition, Errors.forCode(errorCode).exception().getMessage());
-                        awaitMetadataUpdate();
-                    }
+            RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+                @Override
+                public void onComplete(ClientResponse resp) {
+                    handleListOffsetResponse(topicPartition, resp, future);
                 }
+            };
+            ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler);
+            this.client.send(clientRequest);
+            return future;
+        } else {
+            // We initiated a connect to the leader, but we need to poll to finish it.
+            return RequestFuture.pollNeeded();
+        }
+    }
+
+    /**
+     * Callback for the response of the list offset call above.
+     * @param topicPartition The partition that was fetched
+     * @param clientResponse The response from the server.
+     */
+    private void handleListOffsetResponse(TopicPartition topicPartition,
+                                          ClientResponse clientResponse,
+                                          RequestFuture<Long> future) {
+        if (clientResponse.wasDisconnected()) {
+            future.retryAfterMetadataRefresh();
+        } else {
+            ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
+            short errorCode = lor.responseData().get(topicPartition).errorCode;
+            if (errorCode == Errors.NONE.code()) {
+                List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+                if (offsets.size() != 1)
+                    throw new IllegalStateException("This should not happen.");
+                long offset = offsets.get(0);
+                log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+
+                future.complete(offset);
+            } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                    || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+                        topicPartition);
+                future.retryAfterMetadataRefresh();
             } else {
-                log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition);
-                client.poll(this.retryBackoffMs, now);
+                log.error("Attempt to fetch offsets for partition {} failed due to: {}",
+                        topicPartition, Errors.forCode(errorCode).exception().getMessage());
+                future.retryAfterMetadataRefresh();
             }
         }
     }
@@ -257,8 +240,10 @@ public class Fetcher<K, V> {
         Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
         for (TopicPartition partition : subscriptions.assignedPartitions()) {
             Node node = cluster.leaderFor(partition);
-            // if there is a leader and no in-flight requests, issue a new fetch
-            if (node != null && this.client.inFlightRequestCount(node.idString()) == 0) {
+            if (node == null) {
+                metadata.requestUpdate();
+            } else if (this.client.inFlightRequestCount(node.idString()) == 0) {
+                // if there is a leader and no in-flight requests, issue a new fetch
                 Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
                 if (fetch == null) {
                     fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
@@ -327,7 +312,7 @@ public class Fetcher<K, V> {
                 } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
                     // TODO: this could be optimized by grouping all out-of-range partitions
                     log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
-                    resetOffset(tp);
+                    subscriptions.needOffsetReset(tp);
                 } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                     log.warn("Unknown error fetching data for topic-partition {}", tp);
                 } else {
@@ -356,17 +341,6 @@ public class Fetcher<K, V> {
         return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
     }
 
-    /*
-     * Request a metadata update and wait until it has occurred
-     */
-    private void awaitMetadataUpdate() {
-        int version = this.metadata.requestUpdate();
-        do {
-            long now = time.milliseconds();
-            this.client.poll(this.retryBackoffMs, now);
-        } while (this.metadata.version() == version);
-    }
-
     private static class PartitionRecords<K, V> {
         public long fetchOffset;
         public TopicPartition partition;
@@ -379,9 +353,6 @@ public class Fetcher<K, V> {
         }
     }
 
-    private static enum AutoOffsetResetStrategy {
-        LATEST, EARLIEST, NONE
-    }
 
     private class FetchManagerMetrics {
         public final Metrics metrics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index e7cfaaa..51eae19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -42,4 +42,14 @@ public final class Heartbeat {
     public long lastHeartbeatSend() {
         return this.lastHeartbeatSend;
     }
+
+    public long timeToNextHeartbeat(long now) {
+        long timeSinceLastHeartbeat = now - lastHeartbeatSend;
+
+        long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+        if (timeSinceLastHeartbeat > hbInterval)
+            return 0;
+        else
+            return hbInterval - timeSinceLastHeartbeat;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
new file mode 100644
index 0000000..13fc9af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Result of an asynchronous request through {@link org.apache.kafka.clients.KafkaClient}. To get the
+ * result of the request, you must use poll using {@link org.apache.kafka.clients.KafkaClient#poll(long, long)}
+ * until {@link #isDone()} returns true. Typical usage might look like this:
+ *
+ * <pre>
+ *     RequestFuture future = sendRequest();
+ *     while (!future.isDone()) {
+ *         client.poll(timeout, now);
+ *     }
+ *
+ *     switch (future.outcome()) {
+ *     case SUCCESS:
+ *         // handle request success
+ *         break;
+ *     case NEED_RETRY:
+ *         // retry after taking possible retry action
+ *         break;
+ *     case EXCEPTION:
+ *         // handle exception
+  *     }
+ * </pre>
+ *
+ * When {@link #isDone()} returns true, there are three possible outcomes (obtained through {@link #outcome()}):
+ *
+ * <ol>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If the request was
+ *    successful, then you can use {@link #value()} to obtain the result.</li>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: If an unhandled exception
+ *    was encountered, you can use {@link #exception()} to get it.</li>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: The request may
+ *    not have been successful, but the failure may be ephemeral and the caller just needs to try the request again.
+ *    In this case, use {@link #retryAction()} to determine what action should be taken (if any) before
+ *    retrying.</li>
+ * </ol>
+ *
+ * @param <T> Return type of the result (Can be Void if there is no response)
+ */
+public class RequestFuture<T> {
+    public static final RequestFuture<Object> NEED_NEW_COORDINATOR = newRetryFuture(RetryAction.FIND_COORDINATOR);
+    public static final RequestFuture<Object> NEED_POLL = newRetryFuture(RetryAction.POLL);
+    public static final RequestFuture<Object> NEED_METADATA_REFRESH = newRetryFuture(RetryAction.REFRESH_METADATA);
+
+    public enum RetryAction {
+        NOOP,             // Retry immediately.
+        POLL,             // Retry after calling poll (e.g. to finish a connection)
+        BACKOFF,          // Retry after a delay
+        FIND_COORDINATOR, // Find a new coordinator before retrying
+        REFRESH_METADATA  // Refresh metadata before retrying
+    }
+
+    public enum Outcome {
+        SUCCESS,
+        NEED_RETRY,
+        EXCEPTION
+    }
+
+    private Outcome outcome;
+    private RetryAction retryAction;
+    private T value;
+    private RuntimeException exception;
+
+    /**
+     * Check whether the response is ready to be handled
+     * @return true if the response is ready, false otherwise
+     */
+    public boolean isDone() {
+        return outcome != null;
+    }
+
+    /**
+     * Get the value corresponding to this request (if it has one, as indicated by {@link #outcome()}).
+     * @return the value if it exists or null
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Check if the request succeeded;
+     * @return true if a value is available, false otherwise
+     */
+    public boolean succeeded() {
+        return outcome == Outcome.SUCCESS;
+    }
+
+    /**
+     * Check if the request completed failed.
+     * @return true if the request failed (whether or not it can be retried)
+     */
+    public boolean failed() {
+        return outcome != Outcome.SUCCESS;
+    }
+
+    /**
+     * Return the error from this response (assuming {@link #succeeded()} has returned false. If the
+     * response is not ready or if there is no retryAction, null is returned.
+     * @return the error if it exists or null
+     */
+    public RetryAction retryAction() {
+        return retryAction;
+    }
+
+    /**
+     * Get the exception from a failed result. You should check that there is an exception
+     * with {@link #hasException()} before using this method.
+     * @return The exception if it exists or null
+     */
+    public RuntimeException exception() {
+        return exception;
+    }
+
+    /**
+     * Check whether there was an exception.
+     * @return true if this request failed with an exception
+     */
+    public boolean hasException() {
+        return outcome == Outcome.EXCEPTION;
+    }
+
+    /**
+     * Check the outcome of the future if it is ready.
+     * @return the outcome or null if the future is not finished
+     */
+    public Outcome outcome() {
+        return outcome;
+    }
+
+    /**
+     * The request failed, but should be retried using the provided retry action.
+     * @param retryAction The action that should be taken by the caller before retrying the request
+     */
+    public void retry(RetryAction retryAction) {
+        this.outcome = Outcome.NEED_RETRY;
+        this.retryAction = retryAction;
+    }
+
+    public void retryNow() {
+        retry(RetryAction.NOOP);
+    }
+
+    public void retryAfterBackoff() {
+        retry(RetryAction.BACKOFF);
+    }
+
+    public void retryWithNewCoordinator() {
+        retry(RetryAction.FIND_COORDINATOR);
+    }
+
+    public void retryAfterMetadataRefresh() {
+        retry(RetryAction.REFRESH_METADATA);
+    }
+
+    /**
+     * Complete the request successfully. After this call, {@link #succeeded()} will return true
+     * and the value can be obtained through {@link #value()}.
+     * @param value corresponding value (or null if there is none)
+     */
+    public void complete(T value) {
+        this.outcome = Outcome.SUCCESS;
+        this.value = value;
+    }
+
+    /**
+     * Raise an exception. The request will be marked as failed, and the caller can either
+     * handle the exception or throw it.
+     * @param e The exception that
+     */
+    public void raise(RuntimeException e) {
+        this.outcome = Outcome.EXCEPTION;
+        this.exception = e;
+    }
+
+    private static <T> RequestFuture<T> newRetryFuture(RetryAction retryAction) {
+        RequestFuture<T> result = new RequestFuture<T>();
+        result.retry(retryAction);
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> pollNeeded() {
+        return (RequestFuture<T>) NEED_POLL;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> metadataRefreshNeeded() {
+        return (RequestFuture<T>) NEED_METADATA_REFRESH;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> newCoordinatorNeeded() {
+        return (RequestFuture<T>) NEED_NEW_COORDINATOR;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index cee7541..6837453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,14 +12,15 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kafka.common.TopicPartition;
-
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer
  */
@@ -49,7 +50,14 @@ public class SubscriptionState {
     /* do we need to request the latest committed offsets from the coordinator? */
     private boolean needsFetchCommittedOffsets;
 
-    public SubscriptionState() {
+    /* Partitions that need to be reset before fetching */
+    private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
+
+    /* Default offset reset strategy */
+    private OffsetResetStrategy offsetResetStrategy;
+
+    public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
+        this.offsetResetStrategy = offsetResetStrategy;
         this.subscribedTopics = new HashSet<String>();
         this.subscribedPartitions = new HashSet<TopicPartition>();
         this.assignedPartitions = new HashSet<TopicPartition>();
@@ -58,6 +66,7 @@ public class SubscriptionState {
         this.committed = new HashMap<TopicPartition, Long>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
+        this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
     }
 
     public void subscribe(String topic) {
@@ -102,12 +111,14 @@ public class SubscriptionState {
         this.committed.remove(tp);
         this.fetched.remove(tp);
         this.consumed.remove(tp);
+        this.resetPartitions.remove(tp);
     }
 
     public void clearAssignment() {
         this.assignedPartitions.clear();
         this.committed.clear();
         this.fetched.clear();
+        this.consumed.clear();
         this.needsPartitionAssignment = !subscribedTopics().isEmpty();
     }
 
@@ -145,6 +156,7 @@ public class SubscriptionState {
     public void seek(TopicPartition tp, long offset) {
         fetched(tp, offset);
         consumed(tp, offset);
+        resetPartitions.remove(tp);
     }
 
     public Set<TopicPartition> assignedPartitions() {
@@ -169,6 +181,28 @@ public class SubscriptionState {
         return this.consumed;
     }
 
+    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
+        this.resetPartitions.put(partition, offsetResetStrategy);
+        this.fetched.remove(partition);
+        this.consumed.remove(partition);
+    }
+
+    public void needOffsetReset(TopicPartition partition) {
+        needOffsetReset(partition, offsetResetStrategy);
+    }
+
+    public boolean isOffsetResetNeeded(TopicPartition partition) {
+        return resetPartitions.containsKey(partition);
+    }
+
+    public boolean isOffsetResetNeeded() {
+        return !resetPartitions.isEmpty();
+    }
+
+    public OffsetResetStrategy resetStrategy(TopicPartition partition) {
+        return resetPartitions.get(partition);
+    }
+
     public boolean hasAllFetchPositions() {
         return this.fetched.size() >= this.assignedPartitions.size();
     }
@@ -192,4 +226,5 @@ public class SubscriptionState {
         this.needsPartitionAssignment = false;
     }
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index f73eedb..af9993c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -182,6 +182,21 @@ public class Utils {
     }
 
     /**
+     * Get the minimum of some long values.
+     * @param first Used to ensure at least one value
+     * @param rest The rest of longs to compare
+     * @return The minimum of all passed argument.
+     */
+    public static long min(long first, long ... rest) {
+        long min = first;
+        for (int i = 0; i < rest.length; i++) {
+            if (rest[i] < min)
+                min = rest[i];
+        }
+        return min;
+    }
+
+    /**
      * Get the length for UTF8-encoding a string without encoding it first
      * 
      * @param s The string to calculate the length for

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 677edd3..26b6b40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 public class MockConsumerTest {
     
-    private MockConsumer<String, String> consumer = new MockConsumer<String, String>();
+    private MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
 
     @Test
     public void testSimpleMock() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 1454ab7..613b192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -17,10 +17,11 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -49,24 +50,20 @@ public class CoordinatorTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
-    private long retryBackoffMs = 0L;
     private int sessionTimeoutMs = 10;
     private String rebalanceStrategy = "not-matter";
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState();
+    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
     private Coordinator coordinator = new Coordinator(client,
         groupId,
-        retryBackoffMs,
         sessionTimeoutMs,
         rebalanceStrategy,
-        metadata,
         subscriptions,
         metrics,
         "consumer" + groupId,
@@ -75,13 +72,14 @@ public class CoordinatorTest {
 
     @Before
     public void setup() {
-        metadata.update(cluster, time.milliseconds());
         client.setNode(node);
     }
 
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal heartbeat
         time.sleep(sessionTimeoutMs);
@@ -94,6 +92,8 @@ public class CoordinatorTest {
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // consumer_coordinator_not_available will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -108,6 +108,8 @@ public class CoordinatorTest {
     @Test
     public void testNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // not_coordinator will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -122,6 +124,8 @@ public class CoordinatorTest {
     @Test
     public void testIllegalGeneration() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(topicName);
@@ -139,6 +143,8 @@ public class CoordinatorTest {
     @Test
     public void testCoordinatorDisconnect() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // coordinator disconnect will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -152,39 +158,67 @@ public class CoordinatorTest {
 
     @Test
     public void testNormalJoinGroup() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal join group
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        assertEquals(Collections.singletonList(tp),
-            coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
-        assertEquals(0, client.inFlightRequestCount());
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
     }
 
     @Test
     public void testReJoinGroup() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
 
         // diconnected from original coordinator will cause re-discover and join again
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+
+        // rediscover the coordinator
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        // try assigning partitions again
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        assertEquals(Collections.singletonList(tp),
-            coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
-        assertEquals(0, client.inFlightRequestCount());
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
     }
 
 
     @Test
     public void testCommitOffsetNormal() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
-        // sync commit
+        // With success flag
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+        RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertTrue(result.succeeded());
 
-        // async commit
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        // Without success flag
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
         client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         assertEquals(1, client.poll(0, time.milliseconds()).size());
     }
@@ -192,34 +226,55 @@ public class CoordinatorTest {
     @Test
     public void testCommitOffsetError() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // async commit with coordinator not available
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
         assertEquals(1, client.poll(0, time.milliseconds()).size());
         assertTrue(coordinator.coordinatorUnknown());
         // resume
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // async commit with not coordinator
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
         assertEquals(1, client.poll(0, time.milliseconds()).size());
         assertTrue(coordinator.coordinatorUnknown());
         // resume
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // sync commit with not_coordinator
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+        RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
 
         // sync commit with coordinator disconnected
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+
+        assertEquals(0, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertTrue(result.succeeded());
     }
 
 
@@ -227,33 +282,70 @@ public class CoordinatorTest {
     public void testFetchOffset() {
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal fetch
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+        RequestFuture<Map<TopicPartition, Long>> result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with loading in progress
         client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.failed());
+        assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction());
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with not coordinator
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.failed());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with no fetchable offsets
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
 
         // fetch with offset topic unknown
         client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
 
         // fetch with offset -1
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4195410..405efdc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -30,10 +29,11 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -41,37 +41,33 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
 
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
-    private long retryBackoffMs = 0L;
     private int minBytes = 1;
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
-    private String offsetReset = "EARLIEST";
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState();
+    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
 
     private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
-        retryBackoffMs,
         minBytes,
         maxWaitMs,
         fetchSize,
         true, // check crc
-        offsetReset,
         new ByteArrayDeserializer(),
         new ByteArrayDeserializer(),
         metadata,
@@ -140,11 +136,11 @@ public class FetcherTest {
         subscriptions.fetched(tp, 5);
         fetcher.initFetches(cluster, time.milliseconds());
         client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
         client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(0L, (long) subscriptions.fetched(tp));
-        assertEquals(0L, (long) subscriptions.consumed(tp));
+        assertEquals(null, subscriptions.fetched(tp));
+        assertEquals(null, subscriptions.consumed(tp));
     }
 
     @Test
@@ -157,11 +153,11 @@ public class FetcherTest {
         // fetch with out of range
         fetcher.initFetches(cluster, time.milliseconds());
         client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
         client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(0L, (long) subscriptions.fetched(tp));
-        assertEquals(0L, (long) subscriptions.consumed(tp));
+        assertEquals(null, subscriptions.fetched(tp));
+        assertEquals(null, subscriptions.consumed(tp));
     }
 
     private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
@@ -169,9 +165,5 @@ public class FetcherTest {
         return response.toStruct();
     }
 
-    private Struct listOffsetResponse(List<Long> offsets, short error) {
-        ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets)));
-        return response.toStruct();
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index ecc78ce..ee1ede0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.MockTime;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -42,4 +43,12 @@ public class HeartbeatTest {
         time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
         assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
     }
+
+    @Test
+    public void testTimeToNextHeartbeat() {
+        heartbeat.sentHeartbeat(0);
+        assertEquals(100, heartbeat.timeToNextHeartbeat(0));
+        assertEquals(0, heartbeat.timeToNextHeartbeat(100));
+        assertEquals(0, heartbeat.timeToNextHeartbeat(200));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index e000cf8..319751c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -22,12 +22,13 @@ import static java.util.Arrays.asList;
 
 import java.util.Collections;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 public class SubscriptionStateTest {
     
-    private final SubscriptionState state = new SubscriptionState();
+    private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
 
@@ -43,7 +44,21 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertAllPositions(tp0, null);
     }
-    
+
+    @Test
+    public void partitionReset() {
+        state.subscribe(tp0);
+        state.seek(tp0, 5);
+        assertEquals(5L, (long) state.fetched(tp0));
+        assertEquals(5L, (long) state.consumed(tp0));
+        state.needOffsetReset(tp0);
+        assertTrue(state.isOffsetResetNeeded());
+        assertTrue(state.isOffsetResetNeeded(tp0));
+        assertEquals(null, state.fetched(tp0));
+        assertEquals(null, state.consumed(tp0));
+    }
+
+    @Test
     public void topicSubscription() {
         state.subscribe("test");
         assertEquals(1, state.subscribedTopics().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2ebe3c2..e7951d8 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -100,4 +100,12 @@ public class UtilsTest {
         buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer();
         this.subTest(buffer);
     }
+
+    @Test
+    public void testMin() {
+        assertEquals(1, Utils.min(1));
+        assertEquals(1, Utils.min(1, 2, 3));
+        assertEquals(1, Utils.min(2, 1, 3));
+        assertEquals(1, Utils.min(2, 3, 1));
+    }
 }
\ No newline at end of file


[2/2] kafka git commit: kafka-2168; New consumer poll() can block other calls like position(), commit(), and close() indefinitely; patched by Jason Gustafson; reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao

Posted by ju...@apache.org.
kafka-2168; New consumer poll() can block other calls like position(), commit(), and close() indefinitely; patched by Jason Gustafson; reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6d326b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6d326b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6d326b0

Branch: refs/heads/trunk
Commit: b6d326b0893e60b350608260fd1bd2542337cb5a
Parents: 2270a75
Author: Jason Gustafson <as...@confluent.io>
Authored: Tue Jun 23 00:07:19 2015 -0400
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 23 00:09:06 2015 -0400

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |   5 +
 .../kafka/clients/consumer/ConsumerRecords.java |   7 +
 .../consumer/ConsumerWakeupException.java       |  20 +
 .../kafka/clients/consumer/KafkaConsumer.java   | 715 +++++++++++++++----
 .../kafka/clients/consumer/MockConsumer.java    |   9 +-
 .../clients/consumer/OffsetResetStrategy.java   |  17 +
 .../clients/consumer/internals/Coordinator.java | 447 ++++++------
 .../clients/consumer/internals/Fetcher.java     | 159 ++---
 .../clients/consumer/internals/Heartbeat.java   |  10 +
 .../consumer/internals/RequestFuture.java       | 209 ++++++
 .../consumer/internals/SubscriptionState.java   |  41 +-
 .../org/apache/kafka/common/utils/Utils.java    |  15 +
 .../clients/consumer/MockConsumerTest.java      |   2 +-
 .../consumer/internals/CoordinatorTest.java     | 148 +++-
 .../clients/consumer/internals/FetcherTest.java |  32 +-
 .../consumer/internals/HeartbeatTest.java       |   9 +
 .../internals/SubscriptionStateTest.java        |  19 +-
 .../apache/kafka/common/utils/UtilsTest.java    |   8 +
 18 files changed, 1330 insertions(+), 542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 8f587bc..fd98740 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -108,4 +108,9 @@ public interface Consumer<K, V> extends Closeable {
      */
     public void close();
 
+    /**
+     * @see KafkaConsumer#wakeup()
+     */
+    public void wakeup();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 1ca75f8..eb75d2e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -27,6 +27,8 @@ import java.util.Map;
  * {@link Consumer#poll(long)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    public static final ConsumerRecords<Object, Object> EMPTY =
+            new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP);
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
 
@@ -103,4 +105,9 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public static <K, V> ConsumerRecords<K, V> empty() {
+        return (ConsumerRecords<K, V>) EMPTY;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
new file mode 100644
index 0000000..35f1ec9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+public class ConsumerWakeupException extends KafkaException {
+    private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 951c34c..9be8fbc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -12,44 +12,48 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.Coordinator;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.common.utils.Utils.min;
+
 /**
  * A Kafka client that consumes records from a Kafka cluster.
  * <p>
@@ -298,10 +302,54 @@ import org.slf4j.LoggerFactory;
  * 
  * <h3>Multithreaded Processing</h3>
  * 
- * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application
- * making the call. We have intentionally avoided implementing a particular threading model for processing.
+ * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
+ * making the call. It is the responsibility of the user to ensure that multi-threaded access
+ * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
+ *
+ * <p>
+ * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
+ * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread
+ * blocking on the operation. This can be used to shutdown the consumer from another thread. The following
+ * snippet shows the typical pattern:
+ *
+ * <pre>
+ * public class KafkaConsumerRunner implements Runnable {
+ *     private final AtomicBoolean closed = new AtomicBoolean(false);
+ *     private final KafkaConsumer consumer;
+ *
+ *     public void run() {
+ *         try {
+ *             consumer.subscribe("topic");
+ *             while (!closed.get()) {
+ *                 ConsumerRecords records = consumer.poll(10000);
+ *                 // Handle new records
+ *             }
+ *         } catch (ConsumerWakeupException e) {
+ *             // Ignore exception if closing
+ *             if (!closed.get()) throw e;
+ *         } finally {
+ *             consumer.close();
+ *         }
+ *     }
+ *
+ *     public void shutdown() {
+ *         closed.set(true);
+ *         consumer.wakeup();
+ *     }
+ * }
+ * </pre>
+ *
+ * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
+ *
+ * <pre>
+ *     closed.set(true);
+ *     consumer.wakeup();
+ * </pre>
+ *
  * <p>
- * This leaves several options for implementing multi-threaded processing of records.
+ * We have intentionally avoided implementing a particular threading model for processing. This leaves several
+ * options for implementing multi-threaded processing of records.
+ *
  * 
  * <h4>1. One Consumer Per Thread</h4>
  * 
@@ -363,6 +411,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final ConsumerRebalanceCallback rebalanceCallback;
     private long lastCommitAttemptMs;
     private boolean closed = false;
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
+
+    // currentThread holds the threadId of the current thread accessing KafkaConsumer
+    // and is used to prevent multi-threaded access
+    private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
+    // refcount is used to allow reentrant access by the thread who has acquired currentThread
+    private int refcount = 0; // reference count for reentrant access
+
+    // TODO: This timeout controls how long we should wait before retrying a request. We should be able
+    //       to leverage the work of KAFKA-2120 to get this value from configuration.
+    private long requestTimeoutMs = 5000L;
 
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -480,13 +539,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
-            this.subscriptions = new SubscriptionState();
+            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
+            this.subscriptions = new SubscriptionState(offsetResetStrategy);
             this.coordinator = new Coordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
-                    this.retryBackoffMs,
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                     config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                    this.metadata,
                     this.subscriptions,
                     metrics,
                     metricGrpPrefix,
@@ -508,12 +566,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 this.valueDeserializer = valueDeserializer;
             }
             this.fetcher = new Fetcher<K, V>(this.client,
-                    this.retryBackoffMs,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
-                    config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
                     this.keyDeserializer,
                     this.valueDeserializer,
                     this.metadata,
@@ -542,8 +598,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
      * hasn't happened yet, or the partitions are in the process of getting reassigned).
      */
-    public synchronized Set<TopicPartition> subscriptions() {
-        return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+    public Set<TopicPartition> subscriptions() {
+        acquire();
+        try {
+            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -561,12 +622,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param topics A variable list of topics that the consumer wants to subscribe to
      */
     @Override
-    public synchronized void subscribe(String... topics) {
-        ensureNotClosed();
-        log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-        for (String topic : topics)
-            this.subscriptions.subscribe(topic);
-        metadata.addTopics(topics);
+    public void subscribe(String... topics) {
+        acquire();
+        try {
+            log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
+            for (String topic : topics)
+                this.subscriptions.subscribe(topic);
+            metadata.addTopics(topics);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -574,16 +639,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
      * metadata change.
      * <p>
-     * 
+     *
      * @param partitions Partitions to incrementally subscribe to
      */
     @Override
-    public synchronized void subscribe(TopicPartition... partitions) {
-        ensureNotClosed();
-        log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
-        for (TopicPartition tp : partitions) {
-            this.subscriptions.subscribe(tp);
-            metadata.addTopics(tp.topic());
+    public void subscribe(TopicPartition... partitions) {
+        acquire();
+        try {
+            log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
+            for (TopicPartition tp : partitions) {
+                this.subscriptions.subscribe(tp);
+                metadata.addTopics(tp.topic());
+            }
+        } finally {
+            release();
         }
     }
 
@@ -593,12 +662,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * 
      * @param topics Topics to unsubscribe from
      */
-    public synchronized void unsubscribe(String... topics) {
-        ensureNotClosed();
-        log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
-        // throw an exception if the topic was never subscribed to
-        for (String topic : topics)
-            this.subscriptions.unsubscribe(topic);
+    public void unsubscribe(String... topics) {
+        acquire();
+        try {
+            log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
+            // throw an exception if the topic was never subscribed to
+            for (String topic : topics)
+                this.subscriptions.unsubscribe(topic);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -607,12 +680,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * 
      * @param partitions Partitions to unsubscribe from
      */
-    public synchronized void unsubscribe(TopicPartition... partitions) {
-        ensureNotClosed();
-        log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
-        // throw an exception if the partition was never subscribed to
-        for (TopicPartition partition : partitions)
-            this.subscriptions.unsubscribe(partition);
+    public void unsubscribe(TopicPartition... partitions) {
+        acquire();
+        try {
+            log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
+            // throw an exception if the partition was never subscribed to
+            for (TopicPartition partition : partitions)
+                this.subscriptions.unsubscribe(partition);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -624,17 +701,65 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
      * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
      * 
-     * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
-     *            indefinitely. Must not be negative
+     * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
+     *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
      * 
      * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
      *             offset reset policy has been configured.
      */
     @Override
-    public synchronized ConsumerRecords<K, V> poll(long timeout) {
-        ensureNotClosed();
-        long now = time.milliseconds();
+    public ConsumerRecords<K, V> poll(long timeout) {
+        acquire();
+        try {
+            if (timeout < 0)
+                throw new IllegalArgumentException("Timeout must not be negative");
+
+            // Poll for new data until the timeout expires
+            long remaining = timeout;
+            while (remaining >= 0) {
+                long start = time.milliseconds();
+                long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start));
+
+                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start);
+                long end = time.milliseconds();
+
+                if (!records.isEmpty()) {
+                    // If data is available, then return it, but first send off the
+                    // next round of fetches to enable pipelining while the user is
+                    // handling the fetched records.
+                    fetcher.initFetches(metadata.fetch(), end);
+                    pollClient(0, end);
+                    return new ConsumerRecords<K, V>(records);
+                }
+
+                remaining -= end - start;
+
+                // Nothing was available, so we should backoff before retrying
+                if (remaining > 0) {
+                    Utils.sleep(min(remaining, retryBackoffMs));
+                    remaining -= time.milliseconds() - end;
+                }
+            }
+
+            return ConsumerRecords.empty();
+        } finally {
+            release();
+        }
+    }
+
+
+    /**
+     * Do one round of polling. In addition to checking for new data, this does any needed
+     * heart-beating, auto-commits, and offset updates.
+     * @param timeout The maximum time to block in the underlying poll
+     * @param now Current time in millis
+     * @return The fetched records (may be empty)
+     */
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
+        Cluster cluster = this.metadata.fetch();
+
+        // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
 
         if (subscriptions.partitionsAutoAssigned()) {
             if (subscriptions.partitionAssignmentNeeded()) {
@@ -649,26 +774,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for
         if (!subscriptions.hasAllFetchPositions())
-            updateFetchPositions(this.subscriptions.missingFetchPositions(), now);
+            updateFetchPositions(this.subscriptions.missingFetchPositions());
 
         // maybe autocommit position
         if (shouldAutoCommit(now))
             commit(CommitType.ASYNC);
 
-        /*
-         * initiate any needed fetches, then block for the timeout the user specified
-         */
-        Cluster cluster = this.metadata.fetch();
+        // Init any new fetches (won't resend pending fetches)
         fetcher.initFetches(cluster, now);
-        client.poll(timeout, now);
 
-        /*
-         * initiate a fetch request for any nodes that we just got a response from without blocking
-         */
-        fetcher.initFetches(cluster, now);
-        client.poll(0, now);
+        pollClient(timeout, now);
 
-        return new ConsumerRecords<K, V>(fetcher.fetchedRecords());
+        return fetcher.fetchedRecords();
     }
 
     /**
@@ -686,18 +803,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param commitType Control whether the commit is blocking
      */
     @Override
-    public synchronized void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
-        ensureNotClosed();
-        log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+        acquire();
+        try {
+            log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
 
-        long now = time.milliseconds();
-        this.lastCommitAttemptMs = now;
+            this.lastCommitAttemptMs = time.milliseconds();
 
-        // commit the offsets with the coordinator
-        boolean syncCommit = commitType.equals(CommitType.SYNC);
-        if (!syncCommit)
-            this.subscriptions.needRefreshCommits();
-        coordinator.commitOffsets(offsets, syncCommit, now);
+            // commit the offsets with the coordinator
+            if (commitType == CommitType.ASYNC)
+                this.subscriptions.needRefreshCommits();
+            commitOffsets(offsets, commitType);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -710,9 +829,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param commitType Whether or not the commit should block until it is acknowledged.
      */
     @Override
-    public synchronized void commit(CommitType commitType) {
-        ensureNotClosed();
-        commit(this.subscriptions.allConsumed(), commitType);
+    public void commit(CommitType commitType) {
+        acquire();
+        try {
+            commit(this.subscriptions.allConsumed(), commitType);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -721,35 +844,43 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
      */
     @Override
-    public synchronized void seek(TopicPartition partition, long offset) {
-        ensureNotClosed();
-        log.debug("Seeking to offset {} for partition {}", offset, partition);
-        this.subscriptions.seek(partition, offset);
+    public void seek(TopicPartition partition, long offset) {
+        acquire();
+        try {
+            log.debug("Seeking to offset {} for partition {}", offset, partition);
+            this.subscriptions.seek(partition, offset);
+        } finally {
+            release();
+        }
     }
 
     /**
      * Seek to the first offset for each of the given partitions
      */
-    public synchronized void seekToBeginning(TopicPartition... partitions) {
-        ensureNotClosed();
-        Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
-                : Arrays.asList(partitions);
-        for (TopicPartition tp : parts) {
-            // TODO: list offset call could be optimized by grouping by node
-            seek(tp, fetcher.offsetBefore(tp, EARLIEST_OFFSET_TIMESTAMP));
+    public void seekToBeginning(TopicPartition... partitions) {
+        acquire();
+        try {
+            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+                    : Arrays.asList(partitions);
+            for (TopicPartition tp : parts)
+                subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+        } finally {
+            release();
         }
     }
 
     /**
      * Seek to the last offset for each of the given partitions
      */
-    public synchronized void seekToEnd(TopicPartition... partitions) {
-        ensureNotClosed();
-        Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
-                : Arrays.asList(partitions);
-        for (TopicPartition tp : parts) {
-            // TODO: list offset call could be optimized by grouping by node
-            seek(tp, fetcher.offsetBefore(tp, LATEST_OFFSET_TIMESTAMP));
+    public void seekToEnd(TopicPartition... partitions) {
+        acquire();
+        try {
+            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+                    : Arrays.asList(partitions);
+            for (TopicPartition tp : parts)
+                subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+        } finally {
+            release();
         }
     }
 
@@ -761,16 +892,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
      *             available.
      */
-    public synchronized long position(TopicPartition partition) {
-        ensureNotClosed();
-        if (!this.subscriptions.assignedPartitions().contains(partition))
-            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
-        Long offset = this.subscriptions.consumed(partition);
-        if (offset == null) {
-            updateFetchPositions(Collections.singleton(partition), time.milliseconds());
-            return this.subscriptions.consumed(partition);
-        } else {
-            return offset;
+    public long position(TopicPartition partition) {
+        acquire();
+        try {
+            if (!this.subscriptions.assignedPartitions().contains(partition))
+                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+            Long offset = this.subscriptions.consumed(partition);
+            if (offset == null) {
+                updateFetchPositions(Collections.singleton(partition));
+                return this.subscriptions.consumed(partition);
+            } else {
+                return offset;
+            }
+        } finally {
+            release();
         }
     }
 
@@ -787,22 +922,26 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             partition.
      */
     @Override
-    public synchronized long committed(TopicPartition partition) {
-        ensureNotClosed();
-        Set<TopicPartition> partitionsToFetch;
-        if (subscriptions.assignedPartitions().contains(partition)) {
+    public long committed(TopicPartition partition) {
+        acquire();
+        try {
+            Set<TopicPartition> partitionsToFetch;
+            if (subscriptions.assignedPartitions().contains(partition)) {
+                Long committed = this.subscriptions.committed(partition);
+                if (committed != null)
+                    return committed;
+                partitionsToFetch = subscriptions.assignedPartitions();
+            } else {
+                partitionsToFetch = Collections.singleton(partition);
+            }
+            refreshCommittedOffsets(partitionsToFetch);
             Long committed = this.subscriptions.committed(partition);
-            if (committed != null)
-                return committed;
-            partitionsToFetch = subscriptions.assignedPartitions();
-        } else {
-            partitionsToFetch = Collections.singleton(partition);
+            if (committed == null)
+                throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+            return committed;
+        } finally {
+            release();
         }
-        refreshCommittedOffsets(partitionsToFetch, time.milliseconds());
-        Long committed = this.subscriptions.committed(partition);
-        if (committed == null)
-            throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
-        return committed;
     }
 
     /**
@@ -822,19 +961,41 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        Cluster cluster = this.metadata.fetch();
-        List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
-        if (parts == null) {
-            metadata.add(topic);
-            awaitMetadataUpdate();
-            parts = metadata.fetch().partitionsForTopic(topic);
+        acquire();
+        try {
+            Cluster cluster = this.metadata.fetch();
+            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+            if (parts == null) {
+                metadata.add(topic);
+                awaitMetadataUpdate();
+                parts = metadata.fetch().partitionsForTopic(topic);
+            }
+            return parts;
+        } finally {
+            release();
         }
-        return parts;
     }
 
     @Override
-    public synchronized void close() {
-        close(false);
+    public void close() {
+        if (closed) return;
+
+        acquire();
+        try {
+            close(false);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
+     * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
+     */
+    @Override
+    public void wakeup() {
+        this.wakeup.set(true);
+        this.client.wakeup();
     }
 
     private void close(boolean swallowException) {
@@ -856,6 +1017,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
     }
 
+    private long timeToNextCommit(long now) {
+        if (!this.autoCommit)
+            return Long.MAX_VALUE;
+        long timeSinceLastCommit = now - this.lastCommitAttemptMs;
+        if (timeSinceLastCommit > this.autoCommitIntervalMs)
+            return 0;
+        return this.autoCommitIntervalMs - timeSinceLastCommit;
+    }
+
     /**
      * Request a metadata update and wait until it has occurred
      */
@@ -863,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         int version = this.metadata.requestUpdate();
         do {
             long now = time.milliseconds();
-            this.client.poll(this.retryBackoffMs, now);
+            this.pollClient(this.retryBackoffMs, now);
         } while (this.metadata.version() == version);
     }
 
@@ -881,8 +1051,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
 
         // get new assigned partitions from the coordinator
-        this.subscriptions.changePartitionAssignment(coordinator.assignPartitions(
-            new ArrayList<String>(this.subscriptions.subscribedTopics()), now));
+        assignPartitions();
 
         // execute the user's callback after rebalance
         log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
@@ -899,25 +1068,73 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
-     * @param now The current time
      * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
      *             defined
      */
-    private void updateFetchPositions(Set<TopicPartition> partitions, long now) {
+    private void updateFetchPositions(Set<TopicPartition> partitions) {
         // first refresh the committed positions in case they are not up-to-date
-        refreshCommittedOffsets(partitions, now);
+        refreshCommittedOffsets(partitions);
 
         // reset the fetch position to the committed position
         for (TopicPartition tp : partitions) {
-            if (subscriptions.fetched(tp) == null) {
-                if (subscriptions.committed(tp) == null) {
-                    // if the committed position is unknown reset the position
-                    fetcher.resetOffset(tp);
-                } else {
-                    log.debug("Resetting offset for partition {} to the committed offset {}",
-                        tp, subscriptions.committed(tp));
-                    subscriptions.seek(tp, subscriptions.committed(tp));
-                }
+            // Skip if we already have a fetch position
+            if (subscriptions.fetched(tp) != null)
+                continue;
+
+            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+            if (subscriptions.isOffsetResetNeeded(tp)) {
+                resetOffset(tp);
+            } else if (subscriptions.committed(tp) == null) {
+                // There's no committed position, so we need to reset with the default strategy
+                subscriptions.needOffsetReset(tp);
+                resetOffset(tp);
+            } else {
+                log.debug("Resetting offset for partition {} to the committed offset {}",
+                    tp, subscriptions.committed(tp));
+                subscriptions.seek(tp, subscriptions.committed(tp));
+            }
+        }
+    }
+
+    /**
+     * Reset offsets for the given partition using the offset reset strategy.
+     *
+     * @param partition The given partition that needs reset offset
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     */
+    private void resetOffset(TopicPartition partition) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+        final long timestamp;
+        if (strategy == OffsetResetStrategy.EARLIEST)
+            timestamp = EARLIEST_OFFSET_TIMESTAMP;
+        else if (strategy == OffsetResetStrategy.LATEST)
+            timestamp = LATEST_OFFSET_TIMESTAMP;
+        else
+            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+        log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
+        long offset = listOffset(partition, timestamp);
+        this.subscriptions.seek(partition, offset);
+    }
+
+    /**
+     * Fetch a single offset before the given timestamp for the partition.
+     *
+     * @param partition The partition that needs fetching offset.
+     * @param timestamp The timestamp for fetching offset.
+     * @return The offset of the message that is published before the given timestamp
+     */
+    private long listOffset(TopicPartition partition, long timestamp) {
+        while (true) {
+            RequestFuture<Long> future = fetcher.listOffset(partition, timestamp);
+
+            if (!future.isDone())
+                pollFuture(future, requestTimeoutMs);
+
+            if (future.isDone()) {
+                if (future.succeeded())
+                    return future.value();
+                handleRequestFailure(future);
             }
         }
     }
@@ -925,13 +1142,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Refresh the committed offsets for given set of partitions and update the cache
      */
-    private void refreshCommittedOffsets(Set<TopicPartition> partitions, long now) {
+    private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
         // we only need to fetch latest committed offset from coordinator if there
         // is some commit process in progress, otherwise our current
         // committed cache is up-to-date
         if (subscriptions.refreshCommitsNeeded()) {
             // contact coordinator to fetch committed offsets
-            Map<TopicPartition, Long> offsets = coordinator.fetchOffsets(partitions, now);
+            Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions);
 
             // update the position with the offsets
             for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
@@ -941,6 +1158,183 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    /**
+     * Block until we have received a partition assignment from the coordinator.
+     */
+    private void assignPartitions() {
+        // Ensure that there are no pending requests to the coordinator. This is important
+        // in particular to avoid resending a pending JoinGroup request.
+        awaitCoordinatorInFlightRequests();
+
+        while (subscriptions.partitionAssignmentNeeded()) {
+            RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds());
+
+            // Block indefinitely for the join group request (which can take as long as a session timeout)
+            if (!future.isDone())
+                pollFuture(future);
+
+            if (future.failed())
+                handleRequestFailure(future);
+        }
+    }
+
+    /**
+     * Block until the coordinator for this group is known.
+     */
+    private void ensureCoordinatorKnown() {
+        while (coordinator.coordinatorUnknown()) {
+            RequestFuture<Void> future = coordinator.discoverConsumerCoordinator();
+
+            if (!future.isDone())
+                pollFuture(future, requestTimeoutMs);
+
+            if (future.failed())
+                handleRequestFailure(future);
+        }
+    }
+
+    /**
+     * Block until any pending requests to the coordinator have been handled.
+     */
+    public void awaitCoordinatorInFlightRequests() {
+        while (coordinator.hasInFlightRequests()) {
+            long now = time.milliseconds();
+            pollClient(-1, now);
+        }
+    }
+
+    /**
+     * Lookup the committed offsets for a set of partitions. This will block until the coordinator has
+     * responded to the offset fetch request.
+     * @param partitions List of partitions to get offsets for
+     * @return Map from partition to its respective offset
+     */
+    private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+        while (true) {
+            long now = time.milliseconds();
+            RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now);
+
+            if (!future.isDone())
+                pollFuture(future, requestTimeoutMs);
+
+            if (future.isDone()) {
+                if (future.succeeded())
+                    return future.value();
+                handleRequestFailure(future);
+            }
+        }
+    }
+
+    /**
+     * Commit offsets. This call blocks (regardless of commitType) until the coordinator
+     * can receive the commit request. Once the request has been made, however, only the
+     * synchronous commits will wait for a successful response from the coordinator.
+     * @param offsets Offsets to commit.
+     * @param commitType Commit policy
+     */
+    private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
+        if (commitType == CommitType.ASYNC) {
+            commitOffsetsAsync(offsets);
+        } else {
+            commitOffsetsSync(offsets);
+        }
+    }
+
+    private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
+        while (true) {
+            long now = time.milliseconds();
+            RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
+
+            if (!future.isDone() || future.succeeded())
+                return;
+
+            handleRequestFailure(future);
+        }
+    }
+
+    private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
+        while (true) {
+            long now = time.milliseconds();
+            RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
+
+            if (!future.isDone())
+                pollFuture(future, requestTimeoutMs);
+
+            if (future.isDone()) {
+                if (future.succeeded())
+                    return;
+                else
+                    handleRequestFailure(future);
+            }
+        }
+    }
+
+    private void handleRequestFailure(RequestFuture<?> future) {
+        if (future.hasException())
+            throw future.exception();
+
+        switch (future.retryAction()) {
+            case BACKOFF:
+                Utils.sleep(retryBackoffMs);
+                break;
+            case POLL:
+                pollClient(retryBackoffMs, time.milliseconds());
+                break;
+            case FIND_COORDINATOR:
+                ensureCoordinatorKnown();
+                break;
+            case REFRESH_METADATA:
+                awaitMetadataUpdate();
+                break;
+            case NOOP:
+                // Do nothing (retry now)
+        }
+    }
+
+    /**
+     * Poll until a result is ready or timeout expires
+     * @param future The future to poll for
+     * @param timeout The time in milliseconds to wait for the result
+     */
+    private void pollFuture(RequestFuture<?> future, long timeout) {
+        // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient
+        // In particular, we must ensure that "timed out" requests will not have their callbacks
+        // invoked at a later time.
+        long remaining = timeout;
+        while (!future.isDone() && remaining >= 0) {
+            long start = time.milliseconds();
+            pollClient(remaining, start);
+            if (future.isDone()) return;
+            remaining -= time.milliseconds() - start;
+        }
+    }
+
+    /**
+     * Poll indefinitely until the result is ready.
+     * @param future The future to poll for.
+     */
+    private void pollFuture(RequestFuture<?> future) {
+        while (!future.isDone()) {
+            long now = time.milliseconds();
+            pollClient(-1, now);
+        }
+    }
+
+    /**
+     * Poll for IO.
+     * @param timeout The maximum time to wait for IO to become available
+     * @param now The current time in milliseconds
+     * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active
+     */
+    private void pollClient(long timeout, long now) {
+        this.client.poll(timeout, now);
+
+        if (wakeup.get()) {
+            wakeup.set(false);
+            throw new ConsumerWakeupException();
+        }
+    }
+
     /*
      * Check that the consumer hasn't been closed.
      */
@@ -948,4 +1342,27 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         if (this.closed)
             throw new IllegalStateException("This consumer has already been closed.");
     }
+
+    /**
+     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
+     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
+     * supported).
+     * @throws IllegalStateException if the consumer has been closed
+     * @throws ConcurrentModificationException if another thread already has the lock
+     */
+    private void acquire() {
+        ensureNotClosed();
+        Long threadId = Thread.currentThread().getId();
+        if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
+            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
+        refcount++;
+    }
+
+    /**
+     * Release the light lock protecting the consumer from multi-threaded access.
+     */
+    private void release() {
+        if (--refcount == 0)
+            currentThread.set(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f50da82..46e26a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -40,8 +40,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private boolean closed;
 
-    public MockConsumer() {
-        this.subscriptions = new SubscriptionState();
+    public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
+        this.subscriptions = new SubscriptionState(offsetResetStrategy);
         this.partitions = new HashMap<String, List<PartitionInfo>>();
         this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
         this.closed = false;
@@ -175,6 +175,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.closed = true;
     }
 
+    @Override
+    public void wakeup() {
+
+    }
+
     private void ensureNotClosed() {
         if (this.closed)
             throw new IllegalStateException("This consumer has already been closed.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
new file mode 100644
index 0000000..542da7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public enum OffsetResetStrategy {
+    LATEST, EARLIEST, NONE
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 41cb945..6c26667 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -15,7 +15,6 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
@@ -57,7 +56,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This class manage the coordination process with the consumer coordinator.
+ * This class manages the coordination process with the consumer coordinator.
  */
 public final class Coordinator {
 
@@ -67,13 +66,11 @@ public final class Coordinator {
 
     private final Time time;
     private final String groupId;
-    private final Metadata metadata;
     private final Heartbeat heartbeat;
     private final int sessionTimeoutMs;
     private final String assignmentStrategy;
     private final SubscriptionState subscriptions;
     private final CoordinatorMetrics sensors;
-    private final long retryBackoffMs;
     private Node consumerCoordinator;
     private String consumerId;
     private int generation;
@@ -83,10 +80,8 @@ public final class Coordinator {
      */
     public Coordinator(KafkaClient client,
                        String groupId,
-                       long retryBackoffMs,
                        int sessionTimeoutMs,
                        String assignmentStrategy,
-                       Metadata metadata,
                        SubscriptionState subscriptions,
                        Metrics metrics,
                        String metricGrpPrefix,
@@ -98,10 +93,8 @@ public final class Coordinator {
         this.generation = -1;
         this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
         this.groupId = groupId;
-        this.metadata = metadata;
         this.consumerCoordinator = null;
         this.subscriptions = subscriptions;
-        this.retryBackoffMs = retryBackoffMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.assignmentStrategy = assignmentStrategy;
         this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
@@ -109,84 +102,110 @@ public final class Coordinator {
     }
 
     /**
-     * Assign partitions for the subscribed topics.
-     *
-     * @param subscribedTopics The subscribed topics list
-     * @param now The current time
-     * @return The assigned partition info
+     * Send a request to get a new partition assignment. This is a non-blocking call which sends
+     * a JoinGroup request to the coordinator (if it is available). The returned future must
+     * be polled to see if the request completed successfully.
+     * @param now The current time in milliseconds
+     * @return A request future whose completion indicates the result of the JoinGroup request.
      */
-    public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) {
+    public RequestFuture<Void> assignPartitions(final long now) {
+        final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
+        if (future.isDone()) return future;
 
         // send a join group request to the coordinator
+        List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
         log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
 
-        // repeat processing the response until succeed or fatal error
-        do {
-            JoinGroupRequest request = new JoinGroupRequest(groupId,
+        JoinGroupRequest request = new JoinGroupRequest(groupId,
                 this.sessionTimeoutMs,
                 subscribedTopics,
                 this.consumerId,
                 this.assignmentStrategy);
 
-            ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
-            JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
-            short errorCode = response.errorCode();
+        // create the request for the coordinator
+        log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
+
+        RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+            @Override
+            public void onComplete(ClientResponse resp) {
+                handleJoinResponse(resp, future);
+            }
+        };
+
+        sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now);
+        return future;
+    }
+
+    private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) {
+        if (response.wasDisconnected()) {
+            handleCoordinatorDisconnect(response);
+            future.retryWithNewCoordinator();
+        } else {
+            // process the response
+            JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody());
+            short errorCode = joinResponse.errorCode();
 
             if (errorCode == Errors.NONE.code()) {
-                this.consumerId = response.consumerId();
-                this.generation = response.generationId();
+                Coordinator.this.consumerId = joinResponse.consumerId();
+                Coordinator.this.generation = joinResponse.generationId();
 
                 // set the flag to refresh last committed offsets
-                this.subscriptions.needRefreshCommits();
+                subscriptions.needRefreshCommits();
 
                 log.debug("Joined group: {}", response);
 
                 // record re-assignment time
-                this.sensors.partitionReassignments.record(time.milliseconds() - now);
+                this.sensors.partitionReassignments.record(response.requestLatencyMs());
 
-                // return assigned partitions
-                return response.assignedPartitions();
+                // update partition assignment
+                subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
+                future.complete(null);
             } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
                 // reset the consumer id and retry immediately
-                this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+                Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
                 log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
-                    groupId);
+                        groupId);
+
+                future.retryNow();
             } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
                     || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
-                Utils.sleep(this.retryBackoffMs);
-
                 log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
-                    groupId);
+                        groupId);
+                future.retryWithNewCoordinator();
             } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
                     || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
                     || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
                 // log the error and re-throw the exception
+                KafkaException e = Errors.forCode(errorCode).exception();
                 log.error("Attempt to join group {} failed due to: {}",
-                    groupId, Errors.forCode(errorCode).exception().getMessage());
-                Errors.forCode(errorCode).maybeThrow();
+                        groupId, e.getMessage());
+                future.raise(e);
             } else {
                 // unexpected error, throw the exception
-                throw new KafkaException("Unexpected error in join group response: "
-                    + Errors.forCode(response.errorCode()).exception().getMessage());
+                future.raise(new KafkaException("Unexpected error in join group response: "
+                        + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
             }
-        } while (true);
+        }
     }
 
     /**
-     * Commit offsets for the specified list of topics and partitions.
-     *
-     * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
-     * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
-     * the commit succeeds.
+     * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
+     * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
+     * asynchronous case.
      *
      * @param offsets The list of offsets per partition that should be committed.
-     * @param blocking Control whether the commit is blocking
      * @param now The current time
+     * @return A request future whose value indicates whether the commit was successful or not
      */
-    public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) {
-        if (!offsets.isEmpty()) {
+    public RequestFuture<Void> commitOffsets(final Map<TopicPartition, Long> offsets, long now) {
+        final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
+        if (future.isDone()) return future;
+
+        if (offsets.isEmpty()) {
+            future.complete(null);
+        } else {
             // create the offset commit request
             Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
             offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
@@ -198,52 +217,63 @@ public final class Coordinator {
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
-            // send request and possibly wait for response if it is blocking
-            RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);
+            RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future);
+            sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+        }
 
-            if (blocking) {
-                boolean done;
-                do {
-                    ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+        return future;
+    }
 
-                    // check for errors
-                    done = true;
-                    OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
-                    for (short errorCode : commitResponse.responseData().values()) {
-                        if (errorCode != Errors.NONE.code())
-                            done = false;
-                    }
-                    if (!done) {
-                        log.debug("Error in offset commit, backing off for {} ms before retrying again.",
-                            this.retryBackoffMs);
-                        Utils.sleep(this.retryBackoffMs);
-                    }
-                } while (!done);
-            } else {
-                this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
-            }
+    private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) {
+        if (coordinatorUnknown())
+            return RequestFuture.newCoordinatorNeeded();
+
+        if (client.ready(this.consumerCoordinator, now))
+            // We have an open connection and we're ready to send
+            return new RequestFuture<T>();
+
+        if (this.client.connectionFailed(this.consumerCoordinator)) {
+            coordinatorDead();
+            return RequestFuture.newCoordinatorNeeded();
         }
+
+        // The connection has been initiated, so we need to poll to finish it
+        return RequestFuture.pollNeeded();
     }
 
     /**
-     * Fetch the committed offsets of the given set of partitions.
+     * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
+     * returned future can be polled to get the actual offsets returned from the broker.
      *
-     * @param partitions The list of partitions which need to ask for committed offsets
-     * @param now The current time
-     * @return The fetched offset values
+     * @param partitions The set of partitions to get offsets for.
+     * @param now The current time in milliseconds
+     * @return A request future containing the committed offsets.
      */
-    public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) {
-        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
-
-        while (true) {
-            // construct the request
-            OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+    public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) {
+        final RequestFuture<Map<TopicPartition, Long>> future = newCoordinatorRequestFuture(now);
+        if (future.isDone()) return future;
 
-            // send the request and block on waiting for response
-            ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
+        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+        // construct the request
+        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+        // send the request with a callback
+        RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+            @Override
+            public void onComplete(ClientResponse resp) {
+                handleOffsetResponse(resp, future);
+            }
+        };
+        sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
+        return future;
+    }
 
+    private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
+        if (resp.wasDisconnected()) {
+            handleCoordinatorDisconnect(resp);
+            future.retryWithNewCoordinator();
+        } else {
             // parse the response to get the offsets
-            boolean offsetsReady = true;
             OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
             Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
@@ -251,23 +281,21 @@ public final class Coordinator {
                 OffsetFetchResponse.PartitionData data = entry.getValue();
                 if (data.hasError()) {
                     log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
-                        .exception()
-                        .getMessage());
+                            .exception()
+                            .getMessage());
                     if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
                         // just retry
-                        offsetsReady = false;
-                        Utils.sleep(this.retryBackoffMs);
+                        future.retryAfterBackoff();
                     } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                         // re-discover the coordinator and retry
                         coordinatorDead();
-                        offsetsReady = false;
-                        Utils.sleep(this.retryBackoffMs);
+                        future.retryWithNewCoordinator();
                     } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                         // just ignore this partition
                         log.debug("Unknown topic or partition for " + tp);
                     } else {
-                        throw new KafkaException("Unexpected error in fetch offset response: "
-                            + Errors.forCode(data.errorCode).exception().getMessage());
+                        future.raise(new KafkaException("Unexpected error in fetch offset response: "
+                                + Errors.forCode(data.errorCode).exception().getMessage()));
                     }
                 } else if (data.offset >= 0) {
                     // record the position with the offset (-1 indicates no committed offset to fetch)
@@ -277,8 +305,8 @@ public final class Coordinator {
                 }
             }
 
-            if (offsetsReady)
-                return offsets;
+            if (!future.isDone())
+                future.complete(offsets);
         }
     }
 
@@ -288,124 +316,105 @@ public final class Coordinator {
      * @param now The current time
      */
     public void maybeHeartbeat(long now) {
-        if (heartbeat.shouldHeartbeat(now)) {
+        if (heartbeat.shouldHeartbeat(now) && coordinatorReady(now)) {
             HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
-            this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now));
+            sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now);
             this.heartbeat.sentHeartbeat(now);
         }
     }
 
-    public boolean coordinatorUnknown() {
-        return this.consumerCoordinator == null;
-    }
-
     /**
-     * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
-     * disconnected). Note that this means any requests sent this way must be idempotent.
-     *
-     * @return The response
+     * Get the time until the next heartbeat is needed.
+     * @param now The current time
+     * @return The duration in milliseconds before the next heartbeat will be needed.
      */
-    private ClientResponse blockingCoordinatorRequest(ApiKeys api,
-                                                      Struct request,
-                                                      RequestCompletionHandler handler,
-                                                      long now) {
-        while (true) {
-            ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now);
-            ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
-            if (coordinatorResponse.wasDisconnected()) {
-                handleCoordinatorDisconnect(coordinatorResponse);
-                Utils.sleep(this.retryBackoffMs);
-            } else {
-                return coordinatorResponse;
-            }
-        }
+    public long timeToNextHeartbeat(long now) {
+        return heartbeat.timeToNextHeartbeat(now);
     }
 
     /**
-     * Ensure the consumer coordinator is known and we have a ready connection to it.
+     * Check whether the coordinator has any in-flight requests.
+     * @return true if the coordinator has pending requests.
      */
-    private void ensureCoordinatorReady() {
-        while (true) {
-            if (this.consumerCoordinator == null)
-                discoverCoordinator();
-
-            while (true) {
-                boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
-                if (ready) {
-                    return;
-                } else {
-                    log.debug("No connection to coordinator, attempting to connect.");
-                    this.client.poll(this.retryBackoffMs, time.milliseconds());
+    public boolean hasInFlightRequests() {
+        return !coordinatorUnknown() && client.inFlightRequestCount(consumerCoordinator.idString()) > 0;
+    }
 
-                    // if the coordinator connection has failed, we need to
-                    // break the inner loop to re-discover the coordinator
-                    if (this.client.connectionFailed(this.consumerCoordinator)) {
-                        log.debug("Coordinator connection failed. Attempting to re-discover.");
-                        coordinatorDead();
-                        break;
-                    }
-                }
-            }
-        }
+    public boolean coordinatorUnknown() {
+        return this.consumerCoordinator == null;
     }
 
-    /**
-     * Mark the current coordinator as dead.
-     */
-    private void coordinatorDead() {
-        if (this.consumerCoordinator != null) {
-            log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
-            this.consumerCoordinator = null;
-        }
+    private boolean coordinatorReady(long now) {
+        return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now);
     }
 
     /**
-     * Keep discovering the consumer coordinator until it is found.
+     * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
+     * one of the brokers. The returned future should be polled to get the result of the request.
+     * @return A request future which indicates the completion of the metadata request
      */
-    private void discoverCoordinator() {
-        while (this.consumerCoordinator == null) {
-            log.debug("No coordinator known, attempting to discover one.");
-            Node coordinator = fetchConsumerCoordinator();
-
-            if (coordinator == null) {
-                log.debug("No coordinator found, backing off.");
-                Utils.sleep(this.retryBackoffMs);
+    public RequestFuture<Void> discoverConsumerCoordinator() {
+        // initiate the consumer metadata request
+        // find a node to ask about the coordinator
+        long now = time.milliseconds();
+        Node node = this.client.leastLoadedNode(now);
+
+        if (node == null) {
+            return RequestFuture.metadataRefreshNeeded();
+        } else if (!this.client.ready(node, now)) {
+            if (this.client.connectionFailed(node)) {
+                return RequestFuture.metadataRefreshNeeded();
             } else {
-                log.debug("Found coordinator: " + coordinator);
-                this.consumerCoordinator = coordinator;
+                return RequestFuture.pollNeeded();
             }
+        } else {
+            final RequestFuture<Void> future = new RequestFuture<Void>();
+
+            // create a consumer metadata request
+            log.debug("Issuing consumer metadata request to broker {}", node.id());
+            ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
+            RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+                @Override
+                public void onComplete(ClientResponse resp) {
+                    handleConsumerMetadataResponse(resp, future);
+                }
+            };
+            send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now);
+            return future;
         }
     }
 
-    /**
-     * Get the current consumer coordinator information via consumer metadata request.
-     *
-     * @return the consumer coordinator node
-     */
-    private Node fetchConsumerCoordinator() {
-
-        // initiate the consumer metadata request
-        ClientRequest request = initiateConsumerMetadataRequest();
-
-        // send the request and wait for its response
-        ClientResponse response = sendAndReceive(request, request.createdTime());
+    private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
+        log.debug("Consumer metadata response {}", resp);
 
         // parse the response to get the coordinator info if it is not disconnected,
         // otherwise we need to request metadata update
-        if (!response.wasDisconnected()) {
-            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
+        if (resp.wasDisconnected()) {
+            future.retryAfterMetadataRefresh();
+        } else {
+            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            if (consumerMetadataResponse.errorCode() == Errors.NONE.code())
-                return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
-                    consumerMetadataResponse.node().host(),
-                    consumerMetadataResponse.node().port());
-        } else {
-            this.metadata.requestUpdate();
+            if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
+                this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
+                        consumerMetadataResponse.node().host(),
+                        consumerMetadataResponse.node().port());
+                future.complete(null);
+            } else {
+                future.retryAfterBackoff();
+            }
         }
+    }
 
-        return null;
+    /**
+     * Mark the current coordinator as dead.
+     */
+    private void coordinatorDead() {
+        if (this.consumerCoordinator != null) {
+            log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
+            this.consumerCoordinator = null;
+        }
     }
 
     /**
@@ -414,79 +423,23 @@ public final class Coordinator {
     private void handleCoordinatorDisconnect(ClientResponse response) {
         int correlation = response.request().request().header().correlationId();
         log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
-            response.request(),
-            correlation,
-            response.request().request().destination());
+                response.request(),
+                correlation,
+                response.request().request().destination());
 
         // mark the coordinator as dead
         coordinatorDead();
     }
 
-    /**
-     * Initiate a consumer metadata request to the least loaded node.
-     *
-     * @return The created request
-     */
-    private ClientRequest initiateConsumerMetadataRequest() {
 
-        // find a node to ask about the coordinator
-        Node node = this.client.leastLoadedNode(time.milliseconds());
-        while (node == null || !this.client.ready(node, time.milliseconds())) {
-            long now = time.milliseconds();
-            this.client.poll(this.retryBackoffMs, now);
-            node = this.client.leastLoadedNode(now);
-
-            // if there is no ready node, backoff before retry
-            if (node == null)
-                Utils.sleep(this.retryBackoffMs);
-        }
-
-        // create a consumer metadata request
-        log.debug("Issuing consumer metadata request to broker {}", node.id());
-
-        ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId);
-        RequestSend send = new RequestSend(node.idString(),
-            this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
-            request.toStruct());
-        long now = time.milliseconds();
-        return new ClientRequest(now, true, send, null);
+    private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
+        send(this.consumerCoordinator, api, request, handler, now);
     }
 
-    /**
-     * Initiate a request to the coordinator.
-     */
-    private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
-
-        // first make sure the coordinator is known and ready
-        ensureCoordinatorReady();
-
-        // create the request for the coordinator
-        log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id());
-
+    private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
         RequestHeader header = this.client.nextRequestHeader(api);
-        RequestSend send = new RequestSend(this.consumerCoordinator.idString(), header, request);
-        return new ClientRequest(now, true, send, handler);
-    }
-
-    /**
-     * Attempt to send a request and receive its response.
-     *
-     * @return The response
-     */
-    private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) {
-
-        // send the request
-        this.client.send(clientRequest);
-
-        // drain all responses from the destination node
-        List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now);
-        if (responses.isEmpty()) {
-            throw new IllegalStateException("This should not happen.");
-        } else {
-            // other requests should be handled by the callback, and
-            // we only care about the response of the last request
-            return responses.get(responses.size() - 1);
-        }
+        RequestSend send = new RequestSend(node.idString(), header, request);
+        this.client.send(new ClientRequest(now, true, send, handler));
     }
 
     private class HeartbeatCompletionHandler implements RequestCompletionHandler {
@@ -521,18 +474,21 @@ public final class Coordinator {
     private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
 
         private final Map<TopicPartition, Long> offsets;
+        private final RequestFuture<Void> future;
 
-        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) {
+        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
             this.offsets = offsets;
+            this.future = future;
         }
 
         @Override
         public void onComplete(ClientResponse resp) {
             if (resp.wasDisconnected()) {
                 handleCoordinatorDisconnect(resp);
+                future.retryWithNewCoordinator();
             } else {
-                OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
-                for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
+                OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody());
+                for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                     TopicPartition tp = entry.getKey();
                     short errorCode = entry.getValue();
                     long offset = this.offsets.get(tp);
@@ -542,14 +498,19 @@ public final class Coordinator {
                     } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
                             || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                         coordinatorDead();
+                        future.retryWithNewCoordinator();
                     } else {
                         // do not need to throw the exception but just log the error
+                        future.retryAfterBackoff();
                         log.error("Error committing partition {} at offset {}: {}",
                             tp,
                             offset,
                             Errors.forCode(errorCode).exception().getMessage());
                     }
                 }
+
+                if (!future.isDone())
+                    future.complete(null);
             }
             sensors.commitLatency.record(resp.requestLatencyMs());
         }