You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/15 21:40:46 UTC

[3/3] kafka git commit: KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang

KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99c0686b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99c0686b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99c0686b

Branch: refs/heads/trunk
Commit: 99c0686be2141a0fffe1c55e279370a87ef8c1ea
Parents: a7e0ac3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jul 15 12:38:45 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 15 12:38:45 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |  10 +
 .../consumer/ConsumerCommitCallback.java        |  33 +
 .../consumer/ConsumerRebalanceCallback.java     |   6 +-
 .../kafka/clients/consumer/ConsumerRecords.java |   4 +
 .../kafka/clients/consumer/KafkaConsumer.java   | 512 ++++----------
 .../kafka/clients/consumer/MockConsumer.java    |  19 +-
 .../internals/ConsumerNetworkClient.java        | 296 +++++++++
 .../clients/consumer/internals/Coordinator.java | 660 ++++++++++++-------
 .../clients/consumer/internals/DelayedTask.java |  24 +
 .../consumer/internals/DelayedTaskQueue.java    |  96 +++
 .../clients/consumer/internals/Fetcher.java     | 182 +++--
 .../clients/consumer/internals/Heartbeat.java   |  27 +-
 .../internals/NoAvailableBrokersException.java  |  23 +
 .../consumer/internals/RequestFuture.java       | 219 +++---
 .../internals/RequestFutureAdapter.java         |  28 +
 .../internals/RequestFutureListener.java        |  23 +
 .../consumer/internals/SendFailedException.java |  27 +
 .../internals/StaleMetadataException.java       |  22 +
 .../consumer/internals/SubscriptionState.java   |   5 +-
 ...onsumerCoordinatorNotAvailableException.java |  40 ++
 .../common/errors/DisconnectException.java      |  39 ++
 .../errors/IllegalGenerationException.java      |  33 +
 .../NotCoordinatorForConsumerException.java     |  40 ++
 .../errors/OffsetLoadInProgressException.java   |  40 ++
 .../errors/UnknownConsumerIdException.java      |  33 +
 .../apache/kafka/common/protocol/Errors.java    |  10 +-
 .../internals/ConsumerNetworkClientTest.java    | 125 ++++
 .../consumer/internals/CoordinatorTest.java     | 479 +++++++++-----
 .../internals/DelayedTaskQueueTest.java         |  89 +++
 .../clients/consumer/internals/FetcherTest.java |  37 +-
 .../consumer/internals/HeartbeatTest.java       |  15 +
 .../consumer/internals/RequestFutureTest.java   |  57 ++
 .../integration/kafka/api/ConsumerTest.scala    |  81 ++-
 33 files changed, 2350 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index fd98740..252b759 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -64,11 +64,21 @@ public interface Consumer<K, V> extends Closeable {
     public void commit(CommitType commitType);
 
     /**
+     * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
+     */
+    public void commit(CommitType commitType, ConsumerCommitCallback callback);
+
+    /**
      * @see KafkaConsumer#commit(Map, CommitType)
      */
     public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
 
     /**
+     * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
+     */
+    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
+
+    /**
      * @see KafkaConsumer#seek(TopicPartition, long)
      */
     public void seek(TopicPartition partition, long offset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
new file mode 100644
index 0000000..f084385
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
+ * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ */
+public interface ConsumerCommitCallback {
+
+    /**
+     * A callback method the user can implement to provide asynchronous handling of commit request completion.
+     * This method will be called when the commit request sent to the server has been acknowledged.
+     *
+     * @param offsets A map of the offsets that this callback applies to
+     * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
+     */
+    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index 74dfdba..ff3f50f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -73,7 +73,8 @@ public interface ConsumerRebalanceCallback {
      * It is guaranteed that all the processes in a consumer group will execute their
      * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
      * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
-     * 
+     *
+     * @param consumer Reference to the consumer for convenience
      * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
      *            assigned to the consumer)
      */
@@ -86,7 +87,8 @@ public interface ConsumerRebalanceCallback {
      * custom offset store to prevent duplicate data
      * <p>
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
-     * 
+     *
+     * @param consumer  Reference to the consumer for convenience
      * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
      */
     public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index eb75d2e..16a8357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -105,6 +105,10 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
         }
     }
 
+    public boolean isEmpty() {
+        return records.isEmpty();
+    }
+
     @SuppressWarnings("unchecked")
     public static <K, V> ConsumerRecords<K, V> empty() {
         return (ConsumerRecords<K, V>) EMPTY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b4e8f7f..9f64255 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -15,9 +15,10 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.DelayedTask;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.RequestFuture;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -49,7 +50,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -394,8 +394,6 @@ import static org.apache.kafka.common.utils.Utils.min;
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
-    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
     private static final long NO_CURRENT_THREAD = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
@@ -405,17 +403,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Fetcher<K, V> fetcher;
 
     private final Time time;
-    private final NetworkClient client;
+    private final ConsumerNetworkClient client;
     private final Metrics metrics;
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final boolean autoCommit;
     private final long autoCommitIntervalMs;
-    private final ConsumerRebalanceCallback rebalanceCallback;
-    private long lastCommitAttemptMs;
     private boolean closed = false;
-    private final AtomicBoolean wakeup = new AtomicBoolean(false);
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -507,14 +502,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             log.debug("Starting the Kafka consumer");
             if (callback == null)
-                this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+                callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
                         ConsumerRebalanceCallback.class);
-            else
-                this.rebalanceCallback = callback;
             this.time = new SystemTime();
             this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
             this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-            this.lastCommitAttemptMs = time.milliseconds();
 
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -535,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             String metricGrpPrefix = "consumer";
             Map<String, String> metricsTags = new LinkedHashMap<String, String>();
             metricsTags.put("client-id", clientId);
-            this.client = new NetworkClient(
+            NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
                     this.metadata,
                     clientId,
@@ -543,6 +535,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
             this.coordinator = new Coordinator(this.client,
@@ -553,8 +546,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metrics,
                     metricGrpPrefix,
                     metricsTags,
-                    this.time);
-
+                    this.time,
+                    requestTimeoutMs,
+                    retryBackoffMs,
+                    wrapRebalanceCallback(callback));
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -581,10 +576,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metrics,
                     metricGrpPrefix,
                     metricsTags,
-                    this.time);
+                    this.time,
+                    this.retryBackoffMs);
 
             config.logUnused();
 
+            if (autoCommit)
+                scheduleAutoCommitTask(autoCommitIntervalMs);
+
             log.debug("Kafka consumer created");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
@@ -719,27 +718,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (timeout < 0)
                 throw new IllegalArgumentException("Timeout must not be negative");
 
-            // Poll for new data until the timeout expires
+            // poll for new data until the timeout expires
             long remaining = timeout;
             while (remaining >= 0) {
                 long start = time.milliseconds();
-                long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start));
-
-                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start);
+                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                 long end = time.milliseconds();
 
                 if (!records.isEmpty()) {
-                    // If data is available, then return it, but first send off the
+                    // if data is available, then return it, but first send off the
                     // next round of fetches to enable pipelining while the user is
                     // handling the fetched records.
-                    fetcher.initFetches(metadata.fetch(), end);
-                    pollClient(0, end);
+                    fetcher.initFetches(metadata.fetch());
+                    client.poll(0);
                     return new ConsumerRecords<K, V>(records);
                 }
 
                 remaining -= end - start;
 
-                // Nothing was available, so we should backoff before retrying
+                // nothing was available, so we should backoff before retrying
                 if (remaining > 0) {
                     Utils.sleep(min(remaining, retryBackoffMs));
                     remaining -= time.milliseconds() - end;
@@ -752,46 +749,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-
     /**
      * Do one round of polling. In addition to checking for new data, this does any needed
      * heart-beating, auto-commits, and offset updates.
      * @param timeout The maximum time to block in the underlying poll
-     * @param now Current time in millis
      * @return The fetched records (may be empty)
      */
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
-        Cluster cluster = this.metadata.fetch();
-
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
         // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+        coordinator.ensureCoordinatorKnown();
 
-        if (subscriptions.partitionsAutoAssigned()) {
-            if (subscriptions.partitionAssignmentNeeded()) {
-                // rebalance to get partition assignment
-                reassignPartitions(now);
-            } else {
-                // try to heartbeat with the coordinator if needed
-                coordinator.maybeHeartbeat(now);
-            }
-        }
+        // ensure we have partitions assigned if we expect to
+        if (subscriptions.partitionsAutoAssigned())
+            coordinator.ensurePartitionAssignment();
 
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for
         if (!subscriptions.hasAllFetchPositions())
             updateFetchPositions(this.subscriptions.missingFetchPositions());
 
-        // maybe autocommit position
-        if (shouldAutoCommit(now))
-            commit(CommitType.ASYNC);
-
-        // Init any new fetches (won't resend pending fetches)
-        fetcher.initFetches(cluster, now);
-
-        pollClient(timeout, now);
-
+        // init any new fetches (won't resend pending fetches)
+        Cluster cluster = this.metadata.fetch();
+        fetcher.initFetches(cluster);
+        client.poll(timeout);
         return fetcher.fetchedRecords();
     }
 
+    private void scheduleAutoCommitTask(final long interval) {
+        DelayedTask task = new DelayedTask() {
+            public void run(long now) {
+                commit(CommitType.ASYNC);
+                client.schedule(this, now + interval);
+            }
+        };
+        client.schedule(task, time.milliseconds() + interval);
+    }
+
     /**
      * Commits the specified offsets for the specified list of topics and partitions to Kafka.
      * <p>
@@ -799,25 +792,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails.
-     * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
-     * the commit succeeds.
-     * 
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+     * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+     * to the caller).
+     *
      * @param offsets The list of offsets per partition that should be committed to Kafka.
      * @param commitType Control whether the commit is blocking
      */
     @Override
     public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+        commit(offsets, commitType, null);
+    }
+
+    /**
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * <p>
+     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     *
+     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param commitType Control whether the commit is blocking
+     * @param callback Callback to invoke when the commit completes
+     */
+    @Override
+    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
         acquire();
         try {
             log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
-
-            this.lastCommitAttemptMs = time.milliseconds();
-
-            // commit the offsets with the coordinator
-            if (commitType == CommitType.ASYNC)
-                this.subscriptions.needRefreshCommits();
-            commitOffsets(offsets, commitType);
+            coordinator.commitOffsets(offsets, commitType, callback);
         } finally {
             release();
         }
@@ -829,22 +839,48 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
-     * 
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     *
      * @param commitType Whether or not the commit should block until it is acknowledged.
+     * @param callback Callback to invoke when the commit completes
      */
     @Override
-    public void commit(CommitType commitType) {
+    public void commit(CommitType commitType, ConsumerCommitCallback callback) {
         acquire();
         try {
-            // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+            // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
             Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
-            commit(allConsumed, commitType);
+            commit(allConsumed, commitType, callback);
         } finally {
             release();
         }
     }
 
     /**
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+     * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+     * to the caller).
+     *
+     * @param commitType Whether or not the commit should block until it is acknowledged.
+     */
+    @Override
+    public void commit(CommitType commitType) {
+        commit(commitType, null);
+    }
+
+    /**
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
@@ -868,8 +904,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
                     : Arrays.asList(partitions);
-            for (TopicPartition tp : parts)
+            for (TopicPartition tp : parts) {
+                log.debug("Seeking to beginning of partition {}", tp);
                 subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+            }
         } finally {
             release();
         }
@@ -883,8 +921,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
                     : Arrays.asList(partitions);
-            for (TopicPartition tp : parts)
+            for (TopicPartition tp : parts) {
+                log.debug("Seeking to end of partition {}", tp);
                 subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+            }
         } finally {
             release();
         }
@@ -931,19 +971,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public long committed(TopicPartition partition) {
         acquire();
         try {
-            Set<TopicPartition> partitionsToFetch;
+            Long committed;
             if (subscriptions.assignedPartitions().contains(partition)) {
-                Long committed = this.subscriptions.committed(partition);
-                if (committed != null)
-                    return committed;
-                partitionsToFetch = subscriptions.assignedPartitions();
+                committed = this.subscriptions.committed(partition);
+                if (committed == null) {
+                    coordinator.refreshCommittedOffsetsIfNeeded();
+                    committed = this.subscriptions.committed(partition);
+                }
             } else {
-                partitionsToFetch = Collections.singleton(partition);
+                Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+                committed = offsets.get(partition);
             }
-            refreshCommittedOffsets(partitionsToFetch);
-            Long committed = this.subscriptions.committed(partition);
+
             if (committed == null)
                 throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+
             return committed;
         } finally {
             release();
@@ -973,7 +1015,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
             if (parts == null) {
                 metadata.add(topic);
-                awaitMetadataUpdate();
+                client.awaitMetadataUpdate();
                 parts = metadata.fetch().partitionsForTopic(topic);
             }
             return parts;
@@ -999,7 +1041,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void wakeup() {
-        this.wakeup.set(true);
         this.client.wakeup();
     }
 
@@ -1017,55 +1058,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
+        return new Coordinator.RebalanceCallback() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
+            }
 
-    private boolean shouldAutoCommit(long now) {
-        return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
-    }
-
-    private long timeToNextCommit(long now) {
-        if (!this.autoCommit)
-            return Long.MAX_VALUE;
-        long timeSinceLastCommit = now - this.lastCommitAttemptMs;
-        if (timeSinceLastCommit > this.autoCommitIntervalMs)
-            return 0;
-        return this.autoCommitIntervalMs - timeSinceLastCommit;
-    }
-
-    /**
-     * Request a metadata update and wait until it has occurred
-     */
-    private void awaitMetadataUpdate() {
-        int version = this.metadata.requestUpdate();
-        do {
-            long now = time.milliseconds();
-            this.pollClient(this.retryBackoffMs, now);
-        } while (this.metadata.version() == version);
-    }
-
-    /**
-     * Get partition assignment
-     */
-    private void reassignPartitions(long now) {
-        // execute the user's callback before rebalance
-        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
-        } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                + " failed on partition revocation: ", e);
-        }
-
-        // get new assigned partitions from the coordinator
-        assignPartitions();
-
-        // execute the user's callback after rebalance
-        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
-        } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                + " failed on partition assignment: ", e);
-        }
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
+            }
+        };
     }
 
     /**
@@ -1077,267 +1081,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             defined
      */
     private void updateFetchPositions(Set<TopicPartition> partitions) {
-        // first refresh the committed positions in case they are not up-to-date
-        refreshCommittedOffsets(partitions);
-
-        // reset the fetch position to the committed position
-        for (TopicPartition tp : partitions) {
-            // Skip if we already have a fetch position
-            if (subscriptions.fetched(tp) != null)
-                continue;
-
-            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
-            if (subscriptions.isOffsetResetNeeded(tp)) {
-                resetOffset(tp);
-            } else if (subscriptions.committed(tp) == null) {
-                // There's no committed position, so we need to reset with the default strategy
-                subscriptions.needOffsetReset(tp);
-                resetOffset(tp);
-            } else {
-                log.debug("Resetting offset for partition {} to the committed offset {}",
-                    tp, subscriptions.committed(tp));
-                subscriptions.seek(tp, subscriptions.committed(tp));
-            }
-        }
-    }
+        // refresh commits for all assigned partitions
+        coordinator.refreshCommittedOffsetsIfNeeded();
 
-    /**
-     * Reset offsets for the given partition using the offset reset strategy.
-     *
-     * @param partition The given partition that needs reset offset
-     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
-     */
-    private void resetOffset(TopicPartition partition) {
-        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
-        final long timestamp;
-        if (strategy == OffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
-        else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
-        log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
-        long offset = listOffset(partition, timestamp);
-        this.subscriptions.seek(partition, offset);
-    }
-
-    /**
-     * Fetch a single offset before the given timestamp for the partition.
-     *
-     * @param partition The partition that needs fetching offset.
-     * @param timestamp The timestamp for fetching offset.
-     * @return The offset of the message that is published before the given timestamp
-     */
-    private long listOffset(TopicPartition partition, long timestamp) {
-        while (true) {
-            RequestFuture<Long> future = fetcher.listOffset(partition, timestamp);
-
-            if (!future.isDone())
-                pollFuture(future, requestTimeoutMs);
-
-            if (future.isDone()) {
-                if (future.succeeded())
-                    return future.value();
-                handleRequestFailure(future);
-            }
-        }
-    }
-
-    /**
-     * Refresh the committed offsets for given set of partitions and update the cache
-     */
-    private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
-        // we only need to fetch latest committed offset from coordinator if there
-        // is some commit process in progress, otherwise our current
-        // committed cache is up-to-date
-        if (subscriptions.refreshCommitsNeeded()) {
-            // contact coordinator to fetch committed offsets
-            Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions);
-
-            // update the position with the offsets
-            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
-                TopicPartition tp = entry.getKey();
-                this.subscriptions.committed(tp, entry.getValue());
-            }
-        }
-    }
-
-    /**
-     * Block until we have received a partition assignment from the coordinator.
-     */
-    private void assignPartitions() {
-        // Ensure that there are no pending requests to the coordinator. This is important
-        // in particular to avoid resending a pending JoinGroup request.
-        awaitCoordinatorInFlightRequests();
-
-        while (subscriptions.partitionAssignmentNeeded()) {
-            RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds());
-
-            // Block indefinitely for the join group request (which can take as long as a session timeout)
-            if (!future.isDone())
-                pollFuture(future);
-
-            if (future.failed())
-                handleRequestFailure(future);
-        }
-    }
-
-    /**
-     * Block until the coordinator for this group is known.
-     */
-    private void ensureCoordinatorKnown() {
-        while (coordinator.coordinatorUnknown()) {
-            RequestFuture<Void> future = coordinator.discoverConsumerCoordinator();
-
-            if (!future.isDone())
-                pollFuture(future, requestTimeoutMs);
-
-            if (future.failed())
-                handleRequestFailure(future);
-        }
-    }
-
-    /**
-     * Block until any pending requests to the coordinator have been handled.
-     */
-    public void awaitCoordinatorInFlightRequests() {
-        while (coordinator.hasInFlightRequests()) {
-            long now = time.milliseconds();
-            pollClient(-1, now);
-        }
-    }
-
-    /**
-     * Lookup the committed offsets for a set of partitions. This will block until the coordinator has
-     * responded to the offset fetch request.
-     * @param partitions List of partitions to get offsets for
-     * @return Map from partition to its respective offset
-     */
-    private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
-        while (true) {
-            long now = time.milliseconds();
-            RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now);
-
-            if (!future.isDone())
-                pollFuture(future, requestTimeoutMs);
-
-            if (future.isDone()) {
-                if (future.succeeded())
-                    return future.value();
-                handleRequestFailure(future);
-            }
-        }
-    }
-
-    /**
-     * Commit offsets. This call blocks (regardless of commitType) until the coordinator
-     * can receive the commit request. Once the request has been made, however, only the
-     * synchronous commits will wait for a successful response from the coordinator.
-     * @param offsets Offsets to commit.
-     * @param commitType Commit policy
-     */
-    private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
-        if (commitType == CommitType.ASYNC) {
-            commitOffsetsAsync(offsets);
-        } else {
-            commitOffsetsSync(offsets);
-        }
-    }
-
-    private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
-        while (true) {
-            long now = time.milliseconds();
-            RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
-            if (!future.isDone() || future.succeeded())
-                return;
-
-            handleRequestFailure(future);
-        }
-    }
-
-    private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
-        while (true) {
-            long now = time.milliseconds();
-            RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
-            if (!future.isDone())
-                pollFuture(future, requestTimeoutMs);
-
-            if (future.isDone()) {
-                if (future.succeeded())
-                    return;
-                else
-                    handleRequestFailure(future);
-            }
-        }
-    }
-
-    private void handleRequestFailure(RequestFuture<?> future) {
-        if (future.hasException())
-            throw future.exception();
-
-        switch (future.retryAction()) {
-            case BACKOFF:
-                Utils.sleep(retryBackoffMs);
-                break;
-            case POLL:
-                pollClient(retryBackoffMs, time.milliseconds());
-                break;
-            case FIND_COORDINATOR:
-                ensureCoordinatorKnown();
-                break;
-            case REFRESH_METADATA:
-                awaitMetadataUpdate();
-                break;
-            case NOOP:
-                // Do nothing (retry now)
-        }
-    }
-
-    /**
-     * Poll until a result is ready or timeout expires
-     * @param future The future to poll for
-     * @param timeout The time in milliseconds to wait for the result
-     */
-    private void pollFuture(RequestFuture<?> future, long timeout) {
-        // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient
-        // In particular, we must ensure that "timed out" requests will not have their callbacks
-        // invoked at a later time.
-        long remaining = timeout;
-        while (!future.isDone() && remaining >= 0) {
-            long start = time.milliseconds();
-            pollClient(remaining, start);
-            if (future.isDone()) return;
-            remaining -= time.milliseconds() - start;
-        }
-    }
-
-    /**
-     * Poll indefinitely until the result is ready.
-     * @param future The future to poll for.
-     */
-    private void pollFuture(RequestFuture<?> future) {
-        while (!future.isDone()) {
-            long now = time.milliseconds();
-            pollClient(-1, now);
-        }
-    }
-
-    /**
-     * Poll for IO.
-     * @param timeout The maximum time to wait for IO to become available
-     * @param now The current time in milliseconds
-     * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active
-     */
-    private void pollClient(long timeout, long now) {
-        this.client.poll(timeout, now);
-
-        if (wakeup.get()) {
-            wakeup.set(false);
-            throw new ConsumerWakeupException();
-        }
+        // then do any offset lookups in case some positions are not known
+        fetcher.updateFetchPositions(partitions);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 46e26a6..c14eed1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -106,16 +106,29 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
         ensureNotClosed();
         for (Entry<TopicPartition, Long> entry : offsets.entrySet())
             subscriptions.committed(entry.getKey(), entry.getValue());
+        if (callback != null) {
+            callback.onComplete(offsets, null);
+        }
     }
 
     @Override
-    public synchronized void commit(CommitType commitType) {
+    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+        commit(offsets, commitType, null);
+    }
+
+    @Override
+    public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) {
         ensureNotClosed();
-        commit(this.subscriptions.allConsumed(), commitType);
+        commit(this.subscriptions.allConsumed(), commitType, callback);
+    }
+
+    @Override
+    public synchronized void commit(CommitType commitType) {
+        commit(commitType, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
new file mode 100644
index 0000000..9517d9d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Higher level consumer access to the network layer with basic support for futures and
+ * task scheduling. NOT thread-safe!
+ *
+ * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
+ * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
+ * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
+ * understand, but there are opportunities to provide timeout or retry capabilities in the future.
+ * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
+ */
+public class ConsumerNetworkClient implements Closeable {
+    private final KafkaClient client;
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
+    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
+    private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+    private final Metadata metadata;
+    private final Time time;
+    private final long retryBackoffMs;
+
+    public ConsumerNetworkClient(KafkaClient client,
+                                 Metadata metadata,
+                                 Time time,
+                                 long retryBackoffMs) {
+        this.client = client;
+        this.metadata = metadata;
+        this.time = time;
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    /**
+     * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
+     * should only be used for coarse synchronization.
+     * @param task The task to be scheduled
+     * @param at The time it should run
+     */
+    public void schedule(DelayedTask task, long at) {
+        delayedTasks.add(task, at);
+    }
+
+    /**
+     * Unschedule a task. This will remove all instances of the task from the task queue.
+     * This is a no-op if the task is not scheduled.
+     * @param task The task to be unscheduled.
+     */
+    public void unschedule(DelayedTask task) {
+        delayedTasks.remove(task);
+    }
+
+    /**
+     * Send a new request. Note that the request is not actually transmitted on the
+     * network until one of the {@link #poll(long)} variants is invoked. At this
+     * point the request will either be transmitted successfully or will fail.
+     * Use the returned future to obtain the result of the send.
+     * @param node The destination of the request
+     * @param api The Kafka API call
+     * @param request The request payload
+     * @return A future which indicates the result of the send.
+     */
+    public RequestFuture<ClientResponse> send(Node node,
+                                              ApiKeys api,
+                                              AbstractRequest request) {
+        long now = time.milliseconds();
+        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
+        RequestHeader header = client.nextRequestHeader(api);
+        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
+        put(node, new ClientRequest(now, true, send, future));
+        return future;
+    }
+
+    private void put(Node node, ClientRequest request) {
+        List<ClientRequest> nodeUnsent = unsent.get(node);
+        if (nodeUnsent == null) {
+            nodeUnsent = new ArrayList<ClientRequest>();
+            unsent.put(node, nodeUnsent);
+        }
+        nodeUnsent.add(request);
+    }
+
+    public Node leastLoadedNode() {
+        return client.leastLoadedNode(time.milliseconds());
+    }
+
+    /**
+     * Block until the metadata has been refreshed.
+     */
+    public void awaitMetadataUpdate() {
+        int version = this.metadata.requestUpdate();
+        do {
+            poll(Long.MAX_VALUE);
+        } while (this.metadata.version() == version);
+    }
+
+    /**
+     * Wakeup an active poll. This will cause the polling thread to throw an exception either
+     * on the current poll if one is active, or the next poll.
+     */
+    public void wakeup() {
+        this.wakeup.set(true);
+        this.client.wakeup();
+    }
+
+    /**
+     * Block indefinitely until the given request future has finished.
+     * @param future The request future to await.
+     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public void poll(RequestFuture<?> future) {
+        while (!future.isDone())
+            poll(Long.MAX_VALUE);
+    }
+
+    /**
+     * Block until the provided request future request has finished or the timeout has expired.
+     * @param future The request future to wait for
+     * @param timeout The maximum duration (in ms) to wait for the request
+     * @return true if the future is done, false otherwise
+     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public boolean poll(RequestFuture<?> future, long timeout) {
+        long now = time.milliseconds();
+        long deadline = now + timeout;
+        while (!future.isDone() && now < deadline) {
+            poll(deadline - now, now);
+            now = time.milliseconds();
+        }
+        return future.isDone();
+    }
+
+    /**
+     * Poll for any network IO. All send requests will either be transmitted on the network
+     * or failed when this call completes.
+     * @param timeout The maximum time to wait for an IO event.
+     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public void poll(long timeout) {
+        poll(timeout, time.milliseconds());
+    }
+
+    private void poll(long timeout, long now) {
+        // send all the requests we can send now
+        pollUnsentRequests(now);
+
+        // ensure we don't poll any longer than the deadline for
+        // the next scheduled task
+        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
+        clientPoll(timeout, now);
+
+        // execute scheduled tasks
+        now = time.milliseconds();
+        delayedTasks.poll(now);
+
+        // try again to send requests since buffer space may have been
+        // cleared or a connect finished in the poll
+        pollUnsentRequests(now);
+
+        // fail all requests that couldn't be sent
+        clearUnsentRequests(now);
+
+    }
+
+    /**
+     * Block until all pending requests from the given node have finished.
+     * @param node The node to await requests from
+     */
+    public void awaitPendingRequests(Node node) {
+        while (pendingRequestCount(node) > 0)
+            poll(retryBackoffMs);
+    }
+
+    /**
+     * Get the count of pending requests to the given node. This includes both request that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @param node The node in question
+     * @return The number of pending requests
+     */
+    public int pendingRequestCount(Node node) {
+        List<ClientRequest> pending = unsent.get(node);
+        int unsentCount = pending == null ? 0 : pending.size();
+        return unsentCount + client.inFlightRequestCount(node.idString());
+    }
+
+    /**
+     * Get the total count of pending requests from all nodes. This includes both requests that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @return The total count of pending requests
+     */
+    public int pendingRequestCount() {
+        int total = 0;
+        for (List<ClientRequest> requests: unsent.values())
+            total += requests.size();
+        return total + client.inFlightRequestCount();
+    }
+
+    private void pollUnsentRequests(long now) {
+        while (trySend(now))
+            clientPoll(0, now);
+    }
+
+    private void clearUnsentRequests(long now) {
+        // clear all unsent requests and fail their corresponding futures
+        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+            while (iterator.hasNext()) {
+                ClientRequest request = iterator.next();
+                RequestFutureCompletionHandler handler =
+                        (RequestFutureCompletionHandler) request.callback();
+                handler.raise(SendFailedException.INSTANCE);
+                iterator.remove();
+            }
+        }
+        unsent.clear();
+    }
+
+    private boolean trySend(long now) {
+        // send any requests that can be sent now
+        boolean requestsSent = false;
+        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+            Node node = requestEntry.getKey();
+            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+            while (iterator.hasNext()) {
+                ClientRequest request = iterator.next();
+                if (client.ready(node, now)) {
+                    client.send(request);
+                    iterator.remove();
+                    requestsSent = true;
+                } else if (client.connectionFailed(node)) {
+                    RequestFutureCompletionHandler handler =
+                            (RequestFutureCompletionHandler) request.callback();
+                    handler.onComplete(new ClientResponse(request, now, true, null));
+                    iterator.remove();
+                }
+            }
+        }
+        return requestsSent;
+    }
+
+    private void clientPoll(long timeout, long now) {
+        client.poll(timeout, now);
+        if (wakeup.get()) {
+            clearUnsentRequests(now);
+            wakeup.set(false);
+            throw new ConsumerWakeupException();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.close();
+    }
+
+    public static class RequestFutureCompletionHandler
+            extends RequestFuture<ClientResponse>
+            implements RequestCompletionHandler {
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            complete(response);
+        }
+    }
+}