You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/06/28 20:12:34 UTC

[httpcomponents-client] branch 5.0.x updated: Fixed connection lease request cancellation race in both classic and asyc pooling connection managers

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch 5.0.x
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git


The following commit(s) were added to refs/heads/5.0.x by this push:
     new d1317da  Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
d1317da is described below

commit d1317da724af5ab1db2a5963aef00b7ffcc76405
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Jun 20 21:39:18 2021 +0200

    Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
---
 .../hc/client5/testing/async/TestHttp1Async.java   |  69 ++++++++
 .../testing/sync/TestClientRequestExecution.java   |  62 +++++++
 .../http/impl/classic/InternalExecRuntime.java     |  17 +-
 .../io/PoolingHttpClientConnectionManager.java     |  45 ++----
 .../nio/PoolingAsyncClientConnectionManager.java   | 180 ++++++++++++---------
 .../io/TestPoolingHttpClientConnectionManager.java |  22 ---
 6 files changed, 259 insertions(+), 136 deletions(-)

diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
index cfa3749..d1d9e20 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
@@ -28,7 +28,12 @@ package org.apache.hc.client5.testing.async;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
@@ -185,4 +190,68 @@ public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest<CloseableH
         Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
     }
 
+    @Test
+    public void testRequestCancellation() throws Exception {
+        this.connManager.setDefaultMaxPerRoute(1);
+        this.connManager.setMaxTotal(1);
+
+        final HttpHost target = start();
+
+        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        try {
+
+            for (int i = 0; i < 20; i++) {
+                final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+                final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+
+                executorService.schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        future.cancel(true);
+                    }
+                }, i % 5, TimeUnit.MILLISECONDS);
+
+                try {
+                    future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                } catch (final TimeoutException ex) {
+                    throw ex;
+                } catch (final Exception ignore) {
+                }
+            }
+
+            final Random rnd = new Random();
+            for (int i = 0; i < 20; i++) {
+                final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+                final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+
+                executorService.schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        future.cancel(true);
+                    }
+                }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+                try {
+                    future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                } catch (final TimeoutException ex) {
+                    throw ex;
+                } catch (final Exception ignore) {
+                }
+            }
+
+            for (int i = 0; i < 5; i++) {
+                final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+                final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                Assert.assertThat(response, CoreMatchers.notNullValue());
+                Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
+            }
+
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
index d9be97e..38e83ca 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
@@ -29,6 +29,10 @@ package org.apache.hc.client5.testing.sync;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.client5.http.HttpRequestRetryStrategy;
 import org.apache.hc.client5.http.classic.methods.HttpGet;
@@ -291,4 +295,62 @@ public class TestClientRequestExecution extends LocalServerTestBase {
         Assert.assertEquals(uri, location);
     }
 
+    @Test
+    public void testRequestCancellation() throws Exception {
+        this.connManager.setDefaultMaxPerRoute(1);
+        this.connManager.setMaxTotal(1);
+
+        final HttpHost target = start();
+
+        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        try {
+
+            for (int i = 0; i < 20; i++) {
+                final HttpGet httpget = new HttpGet("/random/1000");
+
+                executorService.schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        httpget.cancel();
+                    }
+                }, 1, TimeUnit.MILLISECONDS);
+
+                try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+                    EntityUtils.consume(response.getEntity());
+                } catch (final Exception ignore) {
+                }
+            }
+
+            final Random rnd = new Random();
+            for (int i = 0; i < 20; i++) {
+                final HttpGet httpget = new HttpGet("/random/1000");
+
+                executorService.schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        httpget.cancel();
+                    }
+                }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+                try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+                    EntityUtils.consume(response.getEntity());
+                } catch (final Exception ignore) {
+                }
+
+            }
+
+            for (int i = 0; i < 5; i++) {
+                final HttpGet httpget = new HttpGet("/random/1000");
+                try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+                    EntityUtils.consume(response.getEntity());
+                }
+            }
+
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
index 299f56d..272af02 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
@@ -104,10 +104,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
             state = object;
             if (cancellableDependency != null) {
                 cancellableDependency.setDependency(connRequest);
-                if (cancellableDependency.isCancelled()) {
-                    connRequest.cancel();
-                    throw new RequestFailedException("Request aborted");
-                }
             }
             try {
                 final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
@@ -115,10 +111,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
                 reusable = connectionEndpoint.isConnected();
                 if (cancellableDependency != null) {
                     cancellableDependency.setDependency(this);
-                    if (cancellableDependency.isCancelled()) {
-                        cancel();
-                        throw new RequestFailedException("Request aborted");
-                    }
                 }
                 if (log.isDebugEnabled()) {
                     log.debug("{}: acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
@@ -155,10 +147,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
     }
 
     private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
-        if (cancellableDependency != null) {
-            if (cancellableDependency.isCancelled()) {
-                throw new RequestFailedException("Request aborted");
-            }
+        if (isExecutionAborted()) {
+            throw new RequestFailedException("Request aborted");
         }
         final RequestConfig requestConfig = context.getRequestConfig();
         final Timeout connectTimeout = requestConfig.getConnectTimeout();
@@ -208,6 +198,9 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
         if (!endpoint.isConnected()) {
             connectEndpoint(endpoint, context);
         }
+        if (isExecutionAborted()) {
+            throw new RequestFailedException("Request aborted");
+        }
         final RequestConfig requestConfig = context.getRequestConfig();
         final Timeout responseTimeout = requestConfig.getResponseTimeout();
         if (responseTimeout != null) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
index 29905a6..ae90703 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
@@ -28,7 +28,6 @@ package org.apache.hc.client5.http.impl.io;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
@@ -95,9 +94,6 @@ import org.slf4j.LoggerFactory;
  * Total time to live (TTL) set at construction time defines maximum life span
  * of persistent connections regardless of their expiration setting. No persistent
  * connection will be re-used past its TTL value.
- * <p>
- * Please note in contrast to 4.x no stale check is employed by default.
- * @see #setValidateAfterInactivity(TimeValue)
  *
  * @since 4.3
  */
@@ -206,6 +202,7 @@ public class PoolingHttpClientConnectionManager
         }
         this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
         this.closed = new AtomicBoolean(false);
+        this.validateAfterInactivity = TimeValue.ofSeconds(2L);
     }
 
     @Internal
@@ -255,7 +252,7 @@ public class PoolingHttpClientConnectionManager
             final Object state) {
         Args.notNull(route, "HTTP route");
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
+            LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
         }
         final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
         return new LeaseRequest() {
@@ -272,15 +269,12 @@ public class PoolingHttpClientConnectionManager
                 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
                 try {
                     poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
-                    if (poolEntry == null || leaseFuture.isCancelled()) {
-                        throw new ExecutionException(new CancellationException("Operation cancelled"));
-                    }
                 } catch (final TimeoutException ex) {
                     leaseFuture.cancel(true);
                     throw ex;
                 }
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
+                    LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
                 }
                 try {
                     final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
@@ -296,7 +290,7 @@ public class PoolingHttpClientConnectionManager
                             }
                             if (stale) {
                                 if (LOG.isDebugEnabled()) {
-                                    LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(conn));
+                                    LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
                                 }
                                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
                             }
@@ -308,21 +302,14 @@ public class PoolingHttpClientConnectionManager
                     } else {
                         poolEntry.assignConnection(connFactory.createConnection(null));
                     }
-                    if (leaseFuture.isCancelled()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: endpoint lease cancelled", id);
-                        }
-                        pool.release(poolEntry, false);
-                    } else {
-                        this.endpoint = new InternalConnectionEndpoint(poolEntry);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
-                        }
+                    this.endpoint = new InternalConnectionEndpoint(poolEntry);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
                     }
                     return this.endpoint;
                 } catch (final Exception ex) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("{}: endpoint lease failed", id);
+                        LOG.debug("{} endpoint lease failed", id);
                     }
                     pool.release(poolEntry, false);
                     throw new ExecutionException(ex.getMessage(), ex);
@@ -346,7 +333,7 @@ public class PoolingHttpClientConnectionManager
             return;
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
+            LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
         }
         final ManagedHttpClientConnection conn = entry.getConnection();
         if (conn != null && keepAlive == null) {
@@ -365,11 +352,11 @@ public class PoolingHttpClientConnectionManager
                     } else {
                         s = "indefinitely";
                     }
-                    LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
+                    LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("{}: connection is not kept alive", ConnPoolSupport.getId(endpoint));
+                    LOG.debug("{} connection is not kept alive", ConnPoolSupport.getId(endpoint));
                 }
             }
         } catch (final RuntimeException ex) {
@@ -378,7 +365,7 @@ public class PoolingHttpClientConnectionManager
         } finally {
             this.pool.release(entry, reusable);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
+                LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
             }
         }
     }
@@ -402,7 +389,7 @@ public class PoolingHttpClientConnectionManager
             host = route.getTargetHost();
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
+            LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
         }
         final ManagedHttpClientConnection conn = poolEntry.getConnection();
         final SocketConfig defaultSocketConfigSnapshot = defaultSocketConfig;
@@ -414,7 +401,7 @@ public class PoolingHttpClientConnectionManager
                 defaultSocketConfigSnapshot != null ? defaultSocketConfigSnapshot : SocketConfig.DEFAULT,
                 context);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
+            LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
         }
     }
 
@@ -526,7 +513,7 @@ public class PoolingHttpClientConnectionManager
         InternalConnectionEndpoint(
                 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
             this.poolEntryRef = new AtomicReference<>(poolEntry);
-            this.id = String.format("ep-%08X", COUNT.getAndIncrement());
+            this.id = String.format("ep-%010d", COUNT.getAndIncrement());
         }
 
         @Override
@@ -591,7 +578,7 @@ public class PoolingHttpClientConnectionManager
             Args.notNull(requestExecutor, "Request executor");
             final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
             if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
+                LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
             }
             return requestExecutor.execute(request, connection, context);
         }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 059debf..9d1250e 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -29,7 +29,10 @@ package org.apache.hc.client5.http.impl.nio;
 
 import java.net.InetSocketAddress;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +50,7 @@ import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
@@ -219,84 +223,114 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
             final Timeout requestTimeout,
             final FutureCallback<AsyncConnectionEndpoint> callback) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
+            LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
         }
-        final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
-        final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
-                route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
-
-                    void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
-                        final ManagedAsyncClientConnection connection = poolEntry.getConnection();
-                        if (connection != null) {
-                            connection.activate();
-                        }
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
-                        }
-                        final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
-                        }
-                        resultFuture.completed(endpoint);
-                    }
-
-                    @Override
-                    public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
-                        final ManagedAsyncClientConnection connection = poolEntry.getConnection();
-                        if (connection != null) {
-                            if (connection.isOpen()) {
-                                final ProtocolVersion protocolVersion = connection.getProtocolVersion();
-                                if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
-                                    final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
-                                    if (TimeValue.isNonNegative(timeValue) &&
-                                            poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
-                                        connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
-
-                                            @Override
-                                            public void execute(final Boolean result) {
-                                                if (result == null || !result) {
-                                                    if (LOG.isDebugEnabled()) {
-                                                        LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
+        return new Future<AsyncConnectionEndpoint>() {
+
+            final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
+
+            final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
+                    route,
+                    state,
+                    requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
+
+                        @Override
+                        public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+                            final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+                            if (connection != null) {
+                                if (connection.isOpen()) {
+                                    final ProtocolVersion protocolVersion = connection.getProtocolVersion();
+                                    if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
+                                        final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
+                                        if (TimeValue.isNonNegative(timeValue) &&
+                                                poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
+                                            connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
+
+                                                @Override
+                                                public void execute(final Boolean result) {
+                                                    if (result == null || !result) {
+                                                        if (LOG.isDebugEnabled()) {
+                                                            LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
+                                                        }
+                                                        poolEntry.discardConnection(CloseMode.IMMEDIATE);
                                                     }
-                                                    poolEntry.discardConnection(CloseMode.IMMEDIATE);
+                                                    leaseCompleted(poolEntry);
                                                 }
-                                                leaseCompleted(poolEntry);
-                                            }
 
-                                        })), Command.Priority.IMMEDIATE);
-                                        return;
+                                            })), Command.Priority.IMMEDIATE);
+                                            return;
+                                        }
                                     }
+                                } else {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
+                                    }
+                                    poolEntry.discardConnection(CloseMode.IMMEDIATE);
                                 }
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
-                                }
-                                poolEntry.discardConnection(CloseMode.IMMEDIATE);
                             }
+                            leaseCompleted(poolEntry);
                         }
-                        leaseCompleted(poolEntry);
-                    }
 
-                    @Override
-                    public void failed(final Exception ex) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: endpoint lease failed", id);
+                        void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+                            final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+                            if (connection != null) {
+                                connection.activate();
+                            }
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
+                            }
+                            final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
+                            }
+                            resultFuture.completed(endpoint);
                         }
-                        resultFuture.failed(ex);
-                    }
 
-                    @Override
-                    public void cancelled() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{}: endpoint lease cancelled", id);
+                        @Override
+                        public void failed(final Exception ex) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} endpoint lease failed", id);
+                            }
+                            resultFuture.failed(ex);
                         }
-                        resultFuture.cancel();
-                    }
 
-                });
+                        @Override
+                        public void cancelled() {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} endpoint lease cancelled", id);
+                            }
+                            resultFuture.cancel();
+                        }
 
-        resultFuture.setDependency(leaseFuture);
-        return resultFuture;
+                    });
+
+            @Override
+            public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
+                return resultFuture.get();
+            }
+
+            @Override
+            public AsyncConnectionEndpoint get(
+                    final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+                return resultFuture.get(timeout, unit);
+            }
+
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                return leaseFuture.cancel(mayInterruptIfRunning);
+            }
+
+            @Override
+            public boolean isDone() {
+                return resultFuture.isDone();
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return resultFuture.isCancelled();
+            }
+
+        };
     }
 
     @Override
@@ -308,7 +342,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
             return;
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
+            LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
         }
         final ManagedAsyncClientConnection connection = entry.getConnection();
         boolean reusable = connection != null && connection.isOpen();
@@ -324,7 +358,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
                     } else {
                         s = "indefinitely";
                     }
-                    LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
+                    LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
                 }
             }
         } catch (final RuntimeException ex) {
@@ -333,7 +367,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
         } finally {
             pool.release(entry, reusable);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
+                LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
             }
         }
     }
@@ -365,7 +399,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
         }
         final InetSocketAddress localAddress = route.getLocalSocketAddress();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
+            LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
         }
         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
                 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
@@ -374,7 +408,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
                     public void completed(final ManagedAsyncClientConnection connection) {
                         try {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
+                                LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
                             }
                             poolEntry.assignConnection(connection);
                             resultFuture.completed(internalEndpoint);
@@ -410,7 +444,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
         connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
+            LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
         }
     }
 
@@ -493,7 +527,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
 
         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
             this.poolEntryRef = new AtomicReference<>(poolEntry);
-            this.id = String.format("ep-%08X", COUNT.getAndIncrement());
+            this.id = String.format("ep-%010d", COUNT.getAndIncrement());
         }
 
         @Override
@@ -525,7 +559,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
             if (poolEntry != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("{}: close {}", id, closeMode);
+                    LOG.debug("{} close {}", id, closeMode);
                 }
                 poolEntry.discardConnection(closeMode);
             }
@@ -561,7 +595,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
                 final HttpContext context) {
             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
             if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
+                LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
             }
             connection.submitCommand(
                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
index 7945dd1..1533880 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
@@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.io;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -159,27 +158,6 @@ public class TestPoolingHttpClientConnectionManager {
         Mockito.verify(pool).release(entry, false);
     }
 
-    @Test(expected= ExecutionException.class)
-    public void testLeaseFutureCancelled() throws Exception {
-        final HttpHost target = new HttpHost("localhost", 80);
-        final HttpRoute route = new HttpRoute(target);
-
-        final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
-        entry.assignConnection(conn);
-
-        Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
-        Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
-        Mockito.when(pool.lease(
-                Mockito.eq(route),
-                Mockito.eq(null),
-                Mockito.<Timeout>any(),
-                Mockito.<FutureCallback<PoolEntry<HttpRoute, ManagedHttpClientConnection>>>eq(null)))
-                .thenReturn(future);
-
-        final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
-        connRequest1.get(Timeout.ofSeconds(1));
-    }
-
     @Test(expected=TimeoutException.class)
     public void testLeaseFutureTimeout() throws Exception {
         final HttpHost target = new HttpHost("localhost", 80);