You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/01 19:08:04 UTC

[kafka] branch 1.1 updated: KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ca027f6  KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
ca027f6 is described below

commit ca027f67a5a2d611b343386df5e47cc6f8f1448a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Mar 1 11:04:11 2018 -0800

    KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
    
    Contention for the lock in ConsumerNetworkClient can lead to a livelock situation in which an active commitSync is unable to make progress because its completion is blocked in the heartbeat thread. The fix is twofold:
    
    1) We change ConsumerNetworkClient to use a fair lock to reduce the chance of each thread getting starved.
    2) We eliminate the dependence on the lock in ConsumerNetworkClient for callback completion so that callbacks will not be blocked by an active poll().
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    | 48 ++++++-----
 .../consumer/internals/ConsumerCoordinator.java    |  8 +-
 .../consumer/internals/ConsumerNetworkClient.java  | 99 +++++++++++++++++-----
 .../java/org/apache/kafka/clients/MockClient.java  | 38 ++++++++-
 .../internals/ConsumerCoordinatorTest.java         | 11 ++-
 .../internals/ConsumerNetworkClientTest.java       | 66 ++++++++++++++-
 6 files changed, 213 insertions(+), 57 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0..2daaddd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,7 +231,7 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (coordinator != null && client.connectionFailed(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
-                coordinatorDead();
+                markCoordinatorUnknown();
                 time.sleep(retryBackoffMs);
             }
 
@@ -487,7 +487,7 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
-                coordinatorDead();
+                markCoordinatorUnknown();
                 log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
                 future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
@@ -550,7 +550,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.debug("SyncGroup failed due to group rebalance");
+                    log.debug("SyncGroup failed because the group began another rebalance");
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
@@ -559,8 +559,8 @@ public abstract class AbstractCoordinator implements Closeable {
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
-                    log.debug("SyncGroup failed:", error.message());
-                    coordinatorDead();
+                    log.debug("SyncGroup failed: {}", error.message());
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else {
                     future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
@@ -627,27 +627,34 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return true if the coordinator is unknown
      */
     public boolean coordinatorUnknown() {
-        return coordinator() == null;
+        return checkAndGetCoordinator() == null;
     }
 
     /**
-     * Get the current coordinator
+     * Get the coordinator if its connection is still active. Otherwise mark it unknown and
+     * return null.
+     *
      * @return the current coordinator or null if it is unknown
      */
-    protected synchronized Node coordinator() {
+    protected synchronized Node checkAndGetCoordinator() {
         if (coordinator != null && client.connectionFailed(coordinator)) {
-            coordinatorDead();
+            markCoordinatorUnknown(true);
             return null;
         }
         return this.coordinator;
     }
 
-    /**
-     * Mark the current coordinator as dead.
-     */
-    protected synchronized void coordinatorDead() {
+    private synchronized Node coordinator() {
+        return this.coordinator;
+    }
+
+    protected synchronized void markCoordinatorUnknown() {
+        markCoordinatorUnknown(false);
+    }
+
+    protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
         if (this.coordinator != null) {
-            log.info("Marking the coordinator {} dead", this.coordinator);
+            log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
             Node oldCoordinator = this.coordinator;
 
             // Mark the coordinator dead before disconnecting requests since the callbacks for any pending
@@ -656,8 +663,9 @@ public abstract class AbstractCoordinator implements Closeable {
             this.coordinator = null;
 
             // Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
-            // Pending callbacks will be invoked with a DisconnectException.
-            client.disconnect(oldCoordinator);
+            // Pending callbacks will be invoked with a DisconnectException on the next call to poll.
+            if (!isDisconnected)
+                client.disconnectAsync(oldCoordinator);
         }
     }
 
@@ -708,7 +716,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // interrupted using wakeup) and the leave group request which have been queued, but not
                 // yet sent to the broker. Wait up to close timeout for these pending requests to be processed.
                 // If coordinator is not known, requests are aborted.
-                Node coordinator = coordinator();
+                Node coordinator = checkAndGetCoordinator();
                 if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs))
                     log.warn("Close timed out with {} pending requests to coordinator, terminating client connections",
                             client.pendingRequestCount(coordinator));
@@ -769,7 +777,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     || error == Errors.NOT_COORDINATOR) {
                 log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                         coordinator());
-                coordinatorDead();
+                markCoordinatorUnknown();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 log.debug("Attempt to heartbeat failed since group is rebalancing");
@@ -800,7 +808,7 @@ public abstract class AbstractCoordinator implements Closeable {
         public void onFailure(RuntimeException e, RequestFuture<T> future) {
             // mark the coordinator as dead
             if (e instanceof DisconnectException) {
-                coordinatorDead();
+                markCoordinatorUnknown(true);
             }
             future.raise(e);
         }
@@ -948,7 +956,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing a successful heartbeat, so we should
                             // probably make sure the coordinator is still healthy.
-                            coordinatorDead();
+                            markCoordinatorUnknown();
                         } else if (heartbeat.pollTimeoutExpired(now)) {
                             // the poll timeout has expired, which means that the foreground thread has stalled
                             // in between calls to poll(), so we explicitly leave the group.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 0aa61bb..2afa1ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -684,7 +684,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (offsets.isEmpty())
             return RequestFuture.voidSuccess();
 
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -762,7 +762,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                             || error == Errors.NOT_COORDINATOR
                             || error == Errors.REQUEST_TIMED_OUT) {
-                        coordinatorDead();
+                        markCoordinatorUnknown();
                         future.raise(error);
                         return;
                     } else if (error == Errors.UNKNOWN_MEMBER_ID
@@ -799,7 +799,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return A request future containing the committed offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -825,7 +825,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     future.raise(error);
                 } else if (error == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    coordinatorDead();
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 0747e8d..fb393a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Higher level consumer access to the network layer with basic support for request futures. This class
@@ -65,10 +66,15 @@ public class ConsumerNetworkClient implements Closeable {
     private final long unsentExpiryMs;
     private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
 
+    // We do not need high throughput, so use a fair lock to try to avoid starvation
+    private final ReentrantLock lock = new ReentrantLock(true);
+
     // when requests complete, they are transferred to this queue prior to invocation. The purpose
     // is to avoid invoking them while holding this object's monitor which can open the door for deadlocks.
     private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
 
+    private final ConcurrentLinkedQueue<Node> pendingDisconnects = new ConcurrentLinkedQueue<>();
+
     // this flag allows the client to be safely woken up without waiting on the lock above. It is
     // atomic to avoid the need to acquire the lock above in order to enable it concurrently.
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
@@ -113,12 +119,22 @@ public class ConsumerNetworkClient implements Closeable {
         return completionHandler.future;
     }
 
-    public synchronized Node leastLoadedNode() {
-        return client.leastLoadedNode(time.milliseconds());
+    public Node leastLoadedNode() {
+        lock.lock();
+        try {
+            return client.leastLoadedNode(time.milliseconds());
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized boolean hasReadyNodes() {
-        return client.hasReadyNodes();
+    public boolean hasReadyNodes() {
+        lock.lock();
+        try {
+            return client.hasReadyNodes();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
@@ -227,14 +243,18 @@ public class ConsumerNetworkClient implements Closeable {
         // there may be handlers which need to be invoked if we woke up the previous call to poll
         firePendingCompletedRequests();
 
-        synchronized (this) {
+        lock.lock();
+        try {
+            // Handle async disconnects prior to attempting any sends
+            handlePendingDisconnects();
+
             // send all the requests we can send now
             trySend(now);
 
             // check whether the poll is still needed by the caller. Note that if the expected completion
             // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
             // handler), the client will be woken up.
-            if (pollCondition == null || pollCondition.shouldBlock()) {
+            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
                 // if there are no requests in flight, do not block longer than the retry backoff
                 if (client.inFlightRequestCount() == 0)
                     timeout = Math.min(timeout, retryBackoffMs);
@@ -265,6 +285,8 @@ public class ConsumerNetworkClient implements Closeable {
 
             // clean unsent requests collection to keep the map from growing indefinitely
             unsent.clean();
+        } finally {
+            lock.unlock();
         }
 
         // called without the lock to avoid deadlock potential if handlers need to acquire locks
@@ -303,8 +325,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The number of pending requests
      */
     public int pendingRequestCount(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount(node) + client.inFlightRequestCount(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -317,8 +342,11 @@ public class ConsumerNetworkClient implements Closeable {
     public boolean hasPendingRequests(Node node) {
         if (unsent.hasRequests(node))
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -328,8 +356,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The total count of pending requests
      */
     public int pendingRequestCount() {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount() + client.inFlightRequestCount();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -341,8 +372,11 @@ public class ConsumerNetworkClient implements Closeable {
     public boolean hasPendingRequests() {
         if (unsent.hasRequests())
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -386,15 +420,25 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    public void disconnect(Node node) {
-        failUnsentRequests(node, DisconnectException.INSTANCE);
+    private void handlePendingDisconnects() {
+        lock.lock();
+        try {
+            while (true) {
+                Node node = pendingDisconnects.poll();
+                if (node == null)
+                    break;
 
-        synchronized (this) {
-            client.disconnect(node.idString());
+                failUnsentRequests(node, DisconnectException.INSTANCE);
+                client.disconnect(node.idString());
+            }
+        } finally {
+            lock.unlock();
         }
+    }
 
-        // We need to poll to ensure callbacks from in-flight requests on the disconnected socket are fired
-        pollNoWakeup();
+    public void disconnectAsync(Node node) {
+        pendingDisconnects.offer(node);
+        client.wakeup();
     }
 
     private void failExpiredRequests(long now) {
@@ -408,16 +452,16 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
-        synchronized (this) {
+        lock.lock();
+        try {
             Collection<ClientRequest> unsentRequests = unsent.remove(node);
             for (ClientRequest unsentRequest : unsentRequests) {
                 RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
                 handler.onFailure(e);
             }
+        } finally {
+            lock.unlock();
         }
-
-        // called without the lock to avoid deadlock potential
-        firePendingCompletedRequests();
     }
 
     private boolean trySend(long now) {
@@ -458,8 +502,11 @@ public class ConsumerNetworkClient implements Closeable {
 
     @Override
     public void close() throws IOException {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.close();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -469,8 +516,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node Node to connect to if possible
      */
     public boolean connectionFailed(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.connectionFailed(node);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -481,8 +531,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node The node to connect to
      */
     public void tryConnect(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.ready(node, time.milliseconds());
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 151b7a8..ee03714 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -84,6 +85,7 @@ public class MockClient implements KafkaClient {
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
     private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+    private volatile int numBlockingWakeups = 0;
 
     public MockClient(Time time) {
         this(time, null);
@@ -187,8 +189,40 @@ public class MockClient implements KafkaClient {
         this.requests.add(request);
     }
 
+    /**
+     * Simulate a blocking poll in order to test wakeup behavior.
+     *
+     * @param numBlockingWakeups The number of polls which will block until woken up
+     */
+    public synchronized void enableBlockingUntilWakeup(int numBlockingWakeups) {
+        this.numBlockingWakeups = numBlockingWakeups;
+    }
+
+    @Override
+    public synchronized void wakeup() {
+        if (numBlockingWakeups > 0) {
+            numBlockingWakeups--;
+            notify();
+        }
+    }
+
+    private synchronized void maybeAwaitWakeup() {
+        try {
+            int remainingBlockingWakeups = numBlockingWakeups;
+            if (remainingBlockingWakeups <= 0)
+                return;
+
+            while (numBlockingWakeups == remainingBlockingWakeups)
+                wait();
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+    }
+
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
+        maybeAwaitWakeup();
+
         List<ClientResponse> copy = new ArrayList<>(this.responses);
 
         if (metadata != null && metadata.updateRequested()) {
@@ -419,10 +453,6 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public void wakeup() {
-    }
-
-    @Override
     public void close() {
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 84cff28..ffb409c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -220,7 +220,8 @@ public class ConsumerCoordinatorTest {
             });
         }
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(numRequests, responses.get());
     }
@@ -237,7 +238,7 @@ public class ConsumerCoordinatorTest {
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = Collections.singletonMap(
                 new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L, ""));
-        consumerClient.send(coordinator.coordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
+        consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
                     @Override
                     public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
@@ -250,7 +251,8 @@ public class ConsumerCoordinatorTest {
                     }
                 });
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
     }
 
@@ -1032,6 +1034,7 @@ public class ConsumerCoordinatorTest {
 
         client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error)));
         consumerClient.pollNoWakeup();
+        consumerClient.pollNoWakeup(); // second poll since coordinator disconnect is async
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -1655,7 +1658,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(Collections.singleton(t1p));
         subscriptions.seek(t1p, 100L);
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
         assertTrue(coordinator.coordinatorUnknown());
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 93c6acd..73b96cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -88,7 +88,8 @@ public class ConsumerNetworkClientTest {
         RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
         assertTrue(consumerClient.hasPendingRequests(node));
         assertFalse(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -99,7 +100,8 @@ public class ConsumerNetworkClientTest {
         consumerClient.pollNoWakeup();
         assertTrue(consumerClient.hasPendingRequests(node));
         assertTrue(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -187,6 +189,66 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void testDisconnectWakesUpPoll() throws Exception {
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+
+        client.enableBlockingUntilWakeup(1);
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t.start();
+
+        consumerClient.disconnectAsync(node);
+        t.join();
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+    }
+
+    @Test
+    public void testFutureCompletionOutsidePoll() throws Exception {
+        // Tests the scenario in which the request that is being awaited in one thread
+        // is received and completed in another thread.
+
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+        consumerClient.pollNoWakeup(); // dequeue and send the request
+
+        client.enableBlockingUntilWakeup(2);
+        Thread t1 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.pollNoWakeup();
+            }
+        };
+        t1.start();
+
+        // Sleep a little so that t1 is blocking in poll
+        Thread.sleep(50);
+
+        Thread t2 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t2.start();
+
+        // Sleep a little so that t2 is awaiting the network client lock
+        Thread.sleep(50);
+
+        // Simulate a network response and return from the poll in t1
+        client.respond(heartbeatResponse(Errors.NONE));
+        client.wakeup();
+
+        // Both threads should complete since t1 should wakeup t2
+        t1.join();
+        t2.join();
+        assertTrue(future.succeeded());
+    }
+
+    @Test
     public void testAwaitForMetadataUpdateWithTimeout() {
         assertFalse(consumerClient.awaitMetadataUpdate(10L));
     }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.