You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/06/28 20:33:21 UTC
[httpcomponents-client] branch master updated: Fixed connection
lease request cancellation race in both classic and asyc pooling connection
managers
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git
The following commit(s) were added to refs/heads/master by this push:
new 29ba623 Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
29ba623 is described below
commit 29ba623ebeec67cd6e8d940b2fed9151c16e4daa
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Jun 20 21:39:18 2021 +0200
Fixed connection lease request cancellation race in both classic and asyc pooling connection managers
---
.../hc/client5/testing/async/TestHttp1Async.java | 78 +++++++++++
.../testing/sync/TestClientRequestExecution.java | 62 +++++++++
.../http/impl/classic/InternalExecRuntime.java | 17 +--
.../io/PoolingHttpClientConnectionManager.java | 17 +--
.../nio/PoolingAsyncClientConnectionManager.java | 144 +++++++++++++--------
.../io/TestPoolingHttpClientConnectionManager.java | 35 -----
6 files changed, 237 insertions(+), 116 deletions(-)
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
index 5d1eccd..3cdacd7 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java
@@ -28,7 +28,12 @@ package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
@@ -200,4 +205,77 @@ public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest<CloseableH
MatcherAssert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
}
+ @Test
+ public void testRequestCancellation() throws Exception {
+ this.connManager.setDefaultMaxPerRoute(1);
+ this.connManager.setMaxTotal(1);
+
+ final HttpHost target = start();
+
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ try {
+
+ for (int i = 0; i < 20; i++) {
+ final Future<SimpleHttpResponse> future = httpclient.execute(
+ SimpleRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/1000")
+ .build(), null);
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ future.cancel(true);
+ }
+ }, i % 5, TimeUnit.MILLISECONDS);
+
+ try {
+ future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ } catch (final TimeoutException ex) {
+ throw ex;
+ } catch (final Exception ignore) {
+ }
+ }
+
+ final Random rnd = new Random();
+ for (int i = 0; i < 20; i++) {
+ final Future<SimpleHttpResponse> future = httpclient.execute(
+ SimpleRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/1000")
+ .build(), null);
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ future.cancel(true);
+ }
+ }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+ try {
+ future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ } catch (final TimeoutException ex) {
+ throw ex;
+ } catch (final Exception ignore) {
+ }
+ }
+
+ for (int i = 0; i < 5; i++) {
+ final Future<SimpleHttpResponse> future = httpclient.execute(
+ SimpleRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/1000")
+ .build(), null);
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ MatcherAssert.assertThat(response, CoreMatchers.notNullValue());
+ MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
+ }
+
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
}
\ No newline at end of file
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
index d6f2bc2..bfb16c3 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java
@@ -29,6 +29,10 @@ package org.apache.hc.client5.testing.sync;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.classic.methods.HttpGet;
@@ -281,4 +285,62 @@ public class TestClientRequestExecution extends LocalServerTestBase {
Assert.assertEquals(uri, location);
}
+ @Test
+ public void testRequestCancellation() throws Exception {
+ this.connManager.setDefaultMaxPerRoute(1);
+ this.connManager.setMaxTotal(1);
+
+ final HttpHost target = start();
+
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ try {
+
+ for (int i = 0; i < 20; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ httpget.cancel();
+ }
+ }, 1, TimeUnit.MILLISECONDS);
+
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ } catch (final Exception ignore) {
+ }
+ }
+
+ final Random rnd = new Random();
+ for (int i = 0; i < 20; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+
+ executorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ httpget.cancel();
+ }
+ }, rnd.nextInt(200), TimeUnit.MILLISECONDS);
+
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ } catch (final Exception ignore) {
+ }
+
+ }
+
+ for (int i = 0; i < 5; i++) {
+ final HttpGet httpget = new HttpGet("/random/1000");
+ try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
+ EntityUtils.consume(response.getEntity());
+ }
+ }
+
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
index ba0d0d1..790f076 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java
@@ -104,10 +104,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
state = object;
if (cancellableDependency != null) {
cancellableDependency.setDependency(connRequest);
- if (cancellableDependency.isCancelled()) {
- connRequest.cancel();
- throw new RequestFailedException("Request aborted");
- }
}
try {
final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
@@ -115,10 +111,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
reusable = connectionEndpoint.isConnected();
if (cancellableDependency != null) {
cancellableDependency.setDependency(this);
- if (cancellableDependency.isCancelled()) {
- cancel();
- throw new RequestFailedException("Request aborted");
- }
}
if (log.isDebugEnabled()) {
log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
@@ -155,10 +147,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
}
private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
- if (cancellableDependency != null) {
- if (cancellableDependency.isCancelled()) {
- throw new RequestFailedException("Request aborted");
- }
+ if (isExecutionAborted()) {
+ throw new RequestFailedException("Request aborted");
}
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout connectTimeout = requestConfig.getConnectTimeout();
@@ -208,6 +198,9 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
if (!endpoint.isConnected()) {
connectEndpoint(endpoint, context);
}
+ if (isExecutionAborted()) {
+ throw new RequestFailedException("Request aborted");
+ }
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
if (responseTimeout != null) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
index 00b40e1..22a29e9 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
@@ -28,7 +28,6 @@ package org.apache.hc.client5.http.impl.io;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -288,9 +287,6 @@ public class PoolingHttpClientConnectionManager
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
try {
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
- if (poolEntry == null || leaseFuture.isCancelled()) {
- throw new ExecutionException(new CancellationException("Operation cancelled"));
- }
} catch (final TimeoutException ex) {
leaseFuture.cancel(true);
throw ex;
@@ -325,16 +321,9 @@ public class PoolingHttpClientConnectionManager
} else {
poolEntry.assignConnection(connFactory.createConnection(null));
}
- if (leaseFuture.isCancelled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} endpoint lease cancelled", id);
- }
- pool.release(poolEntry, false);
- } else {
- this.endpoint = new InternalConnectionEndpoint(poolEntry);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
- }
+ this.endpoint = new InternalConnectionEndpoint(poolEntry);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
return this.endpoint;
} catch (final Exception ex) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 53c34ab..3915e3a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -29,7 +29,10 @@ package org.apache.hc.client5.http.impl.nio;
import java.net.InetSocketAddress;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,6 +51,7 @@ import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Resolver;
@@ -228,72 +232,102 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
- final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
- final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
- final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
- route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
-
- void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
- final ManagedAsyncClientConnection connection = poolEntry.getConnection();
- if (connection != null) {
- connection.activate();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
- }
- final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
- }
- resultFuture.completed(endpoint);
- }
-
- @Override
- public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
- final ManagedAsyncClientConnection connection = poolEntry.getConnection();
- final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null;
- if (TimeValue.isNonNegative(timeValue) && connection != null &&
- poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
- final ProtocolVersion protocolVersion = connection.getProtocolVersion();
- if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
- connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
- if (result == null || !result) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
+ return new Future<AsyncConnectionEndpoint>() {
+
+ final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
+ final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
+
+ final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
+ route,
+ state,
+ requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
+
+ @Override
+ public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+ final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+ final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null;
+ if (TimeValue.isNonNegative(timeValue) && connection != null &&
+ poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
+ final ProtocolVersion protocolVersion = connection.getProtocolVersion();
+ if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
+ connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
+ if (result == null || !result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
+ }
+ poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
- poolEntry.discardConnection(CloseMode.IMMEDIATE);
+ })), Command.Priority.IMMEDIATE);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
}
- })), Command.Priority.IMMEDIATE);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
+ poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
- poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
+ leaseCompleted(poolEntry);
}
- leaseCompleted(poolEntry);
- }
- @Override
- public void failed(final Exception ex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} endpoint lease failed", id);
+ void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
+ final ManagedAsyncClientConnection connection = poolEntry.getConnection();
+ if (connection != null) {
+ connection.activate();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
+ }
+ final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
+ }
+ resultFuture.completed(endpoint);
}
- resultFuture.failed(ex);
- }
- @Override
- public void cancelled() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} endpoint lease cancelled", id);
+ @Override
+ public void failed(final Exception ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint lease failed", id);
+ }
+ resultFuture.failed(ex);
}
- resultFuture.cancel();
- }
- });
+ @Override
+ public void cancelled() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} endpoint lease cancelled", id);
+ }
+ resultFuture.cancel();
+ }
- resultFuture.setDependency(leaseFuture);
- return resultFuture;
+ });
+
+ @Override
+ public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
+ return resultFuture.get();
+ }
+
+ @Override
+ public AsyncConnectionEndpoint get(
+ final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return resultFuture.get(timeout, unit);
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return leaseFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isDone() {
+ return resultFuture.isDone();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return resultFuture.isCancelled();
+ }
+
+ };
}
@Override
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
index 14cce07..29dbdd9 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java
@@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.io;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -100,11 +99,8 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
-
Mockito.when(conn.isOpen()).thenReturn(true);
Mockito.when(conn.isConsistent()).thenReturn(true);
- Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@@ -130,9 +126,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
-
- Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@@ -152,28 +145,6 @@ public class TestPoolingHttpClientConnectionManager {
}
@Test
- public void testLeaseFutureCancelled() throws Exception {
- final HttpHost target = new HttpHost("localhost", 80);
- final HttpRoute route = new HttpRoute(target);
-
- final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
- entry.assignConnection(conn);
-
- Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
- Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
- Mockito.when(pool.lease(
- Mockito.eq(route),
- Mockito.eq(null),
- Mockito.any(),
- Mockito.eq(null)))
- .thenReturn(future);
-
- final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
- Assert.assertThrows(ExecutionException.class, () ->
- connRequest1.get(Timeout.ofSeconds(1)));
- }
-
- @Test
public void testLeaseFutureTimeout() throws Exception {
final HttpHost target = new HttpHost("localhost", 80);
final HttpRoute route = new HttpRoute(target);
@@ -199,7 +170,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@@ -229,7 +199,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@@ -260,9 +229,7 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(conn.isOpen()).thenReturn(false);
- Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@@ -313,9 +280,7 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
- Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(conn.isOpen()).thenReturn(false);
- Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),