You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/06/28 20:12:34 UTC
[httpcomponents-client] branch 5.0.x updated: Fixed connection
lease request cancellation race in both classic and asyc pooling connection
managers
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch 5.0.x
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git
The following commit(s) were added to refs/heads/5.0.x by this push:
new d1317da Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
d1317da is described below
commit d1317da724af5ab1db2a5963aef00b7ffcc76405
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Jun 20 21:39:18 2021 +0200
Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
---
.../hc/client5/testing/async/TestHttp1Async.java | 69 ++++++++
.../testing/sync/TestClientRequestExecution.java | 62 +++++++
.../http/impl/classic/InternalExecRuntime.java | 17 +-
.../io/PoolingHttpClientConnectionManager.java | 45 ++----
.../nio/PoolingAsyncClientConnectionManager.java | 180 ++++++++++++---------
.../io/TestPoolingHttpClientConnectionManager.java | 22 ---
6 files changed, 259 insertions(+), 136 deletions(-)
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
index cfa3749..d1d9e20 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
@@ -28,7 +28,12 @@ package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
@@ -185,4 +190,68 @@ public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest<CloseableH
Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
}
+ @Test
+ public void testRequestCancellation() throws Exception {
+ this.connManager.setDefaultMaxPerRoute(1);
+ this.connManager.setMaxTotal(1);
+
+ final HttpHost target = start();
+
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ try {
+
+ for (int i = 0; i < 20; i++) {
+ final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+ final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ future.cancel(true);
+ }
+ }, i % 5, TimeUnit.MILLISECONDS);
+
+ try {
+ future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ } catch (final TimeoutException ex) {
+ throw ex;
+ } catch (final Exception ignore) {
+ }
+ }
+
+ final Random rnd = new Random();
+ for (int i = 0; i < 20; i++) {
+ final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+ final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ future.cancel(true);
+ }
+ }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+ try {
+ future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ } catch (final TimeoutException ex) {
+ throw ex;
+ } catch (final Exception ignore) {
+ }
+ }
+
+ for (int i = 0; i < 5; i++) {
+ final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/1000");
+ final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(response, CoreMatchers.notNullValue());
+ Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
+ }
+
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
}
\ No newline at end of file
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
index d9be97e..38e83ca 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
@@ -29,6 +29,10 @@ package org.apache.hc.client5.testing.sync;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.classic.methods.HttpGet;
@@ -291,4 +295,62 @@ public class TestClientRequestExecution extends LocalServerTestBase {
Assert.assertEquals(uri, location);
}
+ @Test
+ public void testRequestCancellation() throws Exception {
+ this.connManager.setDefaultMaxPerRoute(1);
+ this.connManager.setMaxTotal(1);
+
+ final HttpHost target = start();
+
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ try {
+
+ for (int i = 0; i < 20; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ httpget.cancel();
+ }
+ }, 1, TimeUnit.MILLISECONDS);
+
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ } catch (final Exception ignore) {
+ }
+ }
+
+ final Random rnd = new Random();
+ for (int i = 0; i < 20; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ httpget.cancel();
+ }
+ }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ } catch (final Exception ignore) {
+ }
+
+ }
+
+ for (int i = 0; i < 5; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ }
+ }
+
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
index 299f56d..272af02 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
@@ -104,10 +104,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
state = object;
if (cancellableDependency != null) {
cancellableDependency.setDependency(connRequest);
- if (cancellableDependency.isCancelled()) {
- connRequest.cancel();
- throw new RequestFailedException("Request aborted");
- }
}
try {
final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
@@ -115,10 +111,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
reusable = connectionEndpoint.isConnected();
if (cancellableDependency != null) {
cancellableDependency.setDependency(this);
- if (cancellableDependency.isCancelled()) {
- cancel();
- throw new RequestFailedException("Request aborted");
- }
}
if (log.isDebugEnabled()) {
log.debug("{}: acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
@@ -155,10 +147,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
}
private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
- if (cancellableDependency != null) {
- if (cancellableDependency.isCancelled()) {
- throw new RequestFailedException("Request aborted");
- }
+ if (isExecutionAborted()) {
+ throw new RequestFailedException("Request aborted");
}
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout connectTimeout = requestConfig.getConnectTimeout();
@@ -208,6 +198,9 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
if (!endpoint.isConnected()) {
connectEndpoint(endpoint, context);
}
+ if (isExecutionAborted()) {
+ throw new RequestFailedException("Request aborted");
+ }
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
if (responseTimeout != null) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
index 29905a6..ae90703 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
@@ -28,7 +28,6 @@ package org.apache.hc.client5.http.impl.io;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -95,9 +94,6 @@ import org.slf4j.LoggerFactory;
* Total time to live (TTL) set at construction time defines maximum life span
* of persistent connections regardless of their expiration setting. No persistent
* connection will be re-used past its TTL value.
- * <p>
- * Please note in contrast to 4.x no stale check is employed by default.
- * @see #setValidateAfterInactivity(TimeValue)
*
* @since 4.3
*/
@@ -206,6 +202,7 @@ public class PoolingHttpClientConnectionManager
}
this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
this.closed = new AtomicBoolean(false);
+ this.validateAfterInactivity = TimeValue.ofSeconds(2L);
}
@Internal
@@ -255,7 +252,7 @@ public class PoolingHttpClientConnectionManager
final Object state) {
Args.notNull(route, "HTTP route");
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
+ LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
return new LeaseRequest() {
@@ -272,15 +269,12 @@ public class PoolingHttpClientConnectionManager
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
try {
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
- if (poolEntry == null || leaseFuture.isCancelled()) {
- throw new ExecutionException(new CancellationException("Operation cancelled"));
- }
} catch (final TimeoutException ex) {
leaseFuture.cancel(true);
throw ex;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
+ LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
}
try {
final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
@@ -296,7 +290,7 @@ public class PoolingHttpClientConnectionManager
}
if (stale) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(conn));
+ LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
}
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
@@ -308,21 +302,14 @@ public class PoolingHttpClientConnectionManager
} else {
poolEntry.assignConnection(connFactory.createConnection(null));
}
- if (leaseFuture.isCancelled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease cancelled", id);
- }
- pool.release(poolEntry, false);
- } else {
- this.endpoint = new InternalConnectionEndpoint(poolEntry);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
- }
+ this.endpoint = new InternalConnectionEndpoint(poolEntry);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
return this.endpoint;
} catch (final Exception ex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease failed", id);
+ LOG.debug("{} endpoint lease failed", id);
}
pool.release(poolEntry, false);
throw new ExecutionException(ex.getMessage(), ex);
@@ -346,7 +333,7 @@ public class PoolingHttpClientConnectionManager
return;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
+ LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
}
final ManagedHttpClientConnection conn = entry.getConnection();
if (conn != null && keepAlive == null) {
@@ -365,11 +352,11 @@ public class PoolingHttpClientConnectionManager
} else {
s = "indefinitely";
}
- LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
+ LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection is not kept alive", ConnPoolSupport.getId(endpoint));
+ LOG.debug("{} connection is not kept alive", ConnPoolSupport.getId(endpoint));
}
}
} catch (final RuntimeException ex) {
@@ -378,7 +365,7 @@ public class PoolingHttpClientConnectionManager
} finally {
this.pool.release(entry, reusable);
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
+ LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
}
}
}
@@ -402,7 +389,7 @@ public class PoolingHttpClientConnectionManager
host = route.getTargetHost();
}
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
+ LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
}
final ManagedHttpClientConnection conn = poolEntry.getConnection();
final SocketConfig defaultSocketConfigSnapshot = defaultSocketConfig;
@@ -414,7 +401,7 @@ public class PoolingHttpClientConnectionManager
defaultSocketConfigSnapshot != null ? defaultSocketConfigSnapshot : SocketConfig.DEFAULT,
context);
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
+ LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
}
}
@@ -526,7 +513,7 @@ public class PoolingHttpClientConnectionManager
InternalConnectionEndpoint(
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
this.poolEntryRef = new AtomicReference<>(poolEntry);
- this.id = String.format("ep-%08X", COUNT.getAndIncrement());
+ this.id = String.format("ep-%010d", COUNT.getAndIncrement());
}
@Override
@@ -591,7 +578,7 @@ public class PoolingHttpClientConnectionManager
Args.notNull(requestExecutor, "Request executor");
final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
+ LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
}
return requestExecutor.execute(request, connection, context);
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 059debf..9d1250e 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -29,7 +29,10 @@ package org.apache.hc.client5.http.impl.nio;
import java.net.InetSocketAddress;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +50,7 @@ import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
@@ -219,84 +223,114 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final Timeout requestTimeout,
final FutureCallback<AsyncConnectionEndpoint> callback) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
+ LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
- final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
- final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
- route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
-
- void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
- final ManagedAsyncClientConnection connection = poolEntry.getConnection();
- if (connection != null) {
- connection.activate();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
- }
- final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
- }
- resultFuture.completed(endpoint);
- }
-
- @Override
- public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
- final ManagedAsyncClientConnection connection = poolEntry.getConnection();
- if (connection != null) {
- if (connection.isOpen()) {
- final ProtocolVersion protocolVersion = connection.getProtocolVersion();
- if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
- final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
- if (TimeValue.isNonNegative(timeValue) &&
- poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
- connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
-
- @Override
- public void execute(final Boolean result) {
- if (result == null || !result) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
+ return new Future<AsyncConnectionEndpoint>() {
+
+ final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
+
+ final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
+ route,
+ state,
+ requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
+
+ @Override
+ public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+ final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+ if (connection != null) {
+ if (connection.isOpen()) {
+ final ProtocolVersion protocolVersion = connection.getProtocolVersion();
+ if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
+ final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
+ if (TimeValue.isNonNegative(timeValue) &&
+ poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
+ connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
+
+ @Override
+ public void execute(final Boolean result) {
+ if (result == null || !result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
+ }
+ poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
- poolEntry.discardConnection(CloseMode.IMMEDIATE);
+ leaseCompleted(poolEntry);
}
- leaseCompleted(poolEntry);
- }
- })), Command.Priority.IMMEDIATE);
- return;
+ })), Command.Priority.IMMEDIATE);
+ return;
+ }
}
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
+ }
+ poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
- }
- poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
+ leaseCompleted(poolEntry);
}
- leaseCompleted(poolEntry);
- }
- @Override
- public void failed(final Exception ex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease failed", id);
+ void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+ final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+ if (connection != null) {
+ connection.activate();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
+ }
+ final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
+ }
+ resultFuture.completed(endpoint);
}
- resultFuture.failed(ex);
- }
- @Override
- public void cancelled() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: endpoint lease cancelled", id);
+ @Override
+ public void failed(final Exception ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint lease failed", id);
+ }
+ resultFuture.failed(ex);
}
- resultFuture.cancel();
- }
- });
+ @Override
+ public void cancelled() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint lease cancelled", id);
+ }
+ resultFuture.cancel();
+ }
- resultFuture.setDependency(leaseFuture);
- return resultFuture;
+ });
+
+ @Override
+ public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
+ return resultFuture.get();
+ }
+
+ @Override
+ public AsyncConnectionEndpoint get(
+ final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return resultFuture.get(timeout, unit);
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return leaseFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isDone() {
+ return resultFuture.isDone();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return resultFuture.isCancelled();
+ }
+
+ };
}
@Override
@@ -308,7 +342,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
return;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
+ LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
}
final ManagedAsyncClientConnection connection = entry.getConnection();
boolean reusable = connection != null && connection.isOpen();
@@ -324,7 +358,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
} else {
s = "indefinitely";
}
- LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
+ LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
}
}
} catch (final RuntimeException ex) {
@@ -333,7 +367,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
} finally {
pool.release(entry, reusable);
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
+ LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
}
}
}
@@ -365,7 +399,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
}
final InetSocketAddress localAddress = route.getLocalSocketAddress();
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
+ LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
}
final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
@@ -374,7 +408,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
public void completed(final ManagedAsyncClientConnection connection) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
+ LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
}
poolEntry.assignConnection(connection);
resultFuture.completed(internalEndpoint);
@@ -410,7 +444,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
+ LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
}
}
@@ -493,7 +527,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
this.poolEntryRef = new AtomicReference<>(poolEntry);
- this.id = String.format("ep-%08X", COUNT.getAndIncrement());
+ this.id = String.format("ep-%010d", COUNT.getAndIncrement());
}
@Override
@@ -525,7 +559,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
if (poolEntry != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: close {}", id, closeMode);
+ LOG.debug("{} close {}", id, closeMode);
}
poolEntry.discardConnection(closeMode);
}
@@ -561,7 +595,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final HttpContext context) {
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
+ LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
}
connection.submitCommand(
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
index 7945dd1..1533880 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
@@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.io;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -159,27 +158,6 @@ public class TestPoolingHttpClientConnectionManager {
Mockito.verify(pool).release(entry, false);
}
- @Test(expected= ExecutionException.class)
- public void testLeaseFutureCancelled() throws Exception {
- final HttpHost target = new HttpHost("localhost", 80);
- final HttpRoute route = new HttpRoute(target);
-
- final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
- entry.assignConnection(conn);
-
- Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
- Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
- Mockito.when(pool.lease(
- Mockito.eq(route),
- Mockito.eq(null),
- Mockito.<Timeout>any(),
- Mockito.<FutureCallback<PoolEntry<HttpRoute, ManagedHttpClientConnection>>>eq(null)))
- .thenReturn(future);
-
- final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
- connRequest1.get(Timeout.ofSeconds(1));
- }
-
@Test(expected=TimeoutException.class)
public void testLeaseFutureTimeout() throws Exception {
final HttpHost target = new HttpHost("localhost", 80);