You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/08/16 07:31:31 UTC
[httpcomponents-core] branch master updated: HTTPCORE-589 Fix
LaxConnPool to ensure available connections are assigned to pending
requests whenever possible.
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
The following commit(s) were added to refs/heads/master by this push:
new 68a1d78 HTTPCORE-589 Fix LaxConnPool to ensure available connections are assigned to pending requests whenever possible.
68a1d78 is described below
commit 68a1d783314de5e53291f3f7361125089d6044e6
Author: Linton Miller <li...@coxautoinc.com>
AuthorDate: Mon Aug 12 05:51:15 2019 +0100
HTTPCORE-589 Fix LaxConnPool to ensure available connections are
assigned to pending requests whenever possible.
---
.../java/org/apache/hc/core5/pool/LaxConnPool.java | 65 ++++++++++++++--------
1 file changed, 42 insertions(+), 23 deletions(-)
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
index 998896d..56e7e88 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Experimental;
@@ -341,6 +342,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
static class PerRoutePool<T, C extends ModalCloseable> {
+ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
+
private final T route;
private final TimeValue timeToLive;
private final PoolReusePolicy policy;
@@ -351,6 +354,7 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
private final Deque<LeaseRequest<T, C>> pending;
private final AtomicBoolean terminated;
private final AtomicInteger allocated;
+ private final AtomicLong releaseSeqNum;
private volatile int max;
@@ -372,6 +376,7 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
this.pending = new ConcurrentLinkedDeque<>();
this.terminated = new AtomicBoolean(false);
this.allocated = new AtomicInteger(0);
+ this.releaseSeqNum = new AtomicLong(0);
this.max = max;
}
@@ -392,14 +397,14 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
}
}
- private boolean allocatePoolEntry() {
+ private PoolEntry<T, C> createPoolEntry() {
final int poolmax = max;
int prev, next;
do {
prev = allocated.get();
next = (prev<poolmax)? prev+1 : prev;
} while (!allocated.compareAndSet(prev, next));
- return prev < next;
+ return (prev < next)? new PoolEntry<T,C>(route, timeToLive) : null;
}
private void deallocatePoolEntry() {
@@ -447,17 +452,18 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
final FutureCallback<PoolEntry<T, C>> callback) {
Asserts.check(!terminated.get(), "Connection pool shut down");
final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
- final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
- if (availableEntry != null) {
- addLeased(availableEntry);
- future.completed(availableEntry);
+ final long releaseState = releaseSeqNum.get();
+ PoolEntry<T, C> entry = getAvailableEntry(state);
+ if (entry == null && pending.isEmpty()) {
+ entry = createPoolEntry();
+ }
+ if (entry != null) {
+ addLeased(entry);
+ future.completed(entry);
} else {
- if (pending.isEmpty() && allocatePoolEntry()) {
- final PoolEntry<T, C> entry = new PoolEntry<>(route, timeToLive);
- addLeased(entry);
- future.completed(entry);
- } else {
- pending.add(new LeaseRequest<>(state, requestTimeout, future));
+ pending.add(new LeaseRequest<>(state, requestTimeout, future));
+ if (releaseState != releaseSeqNum.get()) {
+ servicePendingRequest();
}
}
return future;
@@ -483,10 +489,16 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
else {
deallocatePoolEntry();
}
+ releaseSeqNum.incrementAndGet();
servicePendingRequest();
}
+
private void servicePendingRequest() {
+ servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
+ }
+
+ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
LeaseRequest<T, C> leaseRequest;
while ((leaseRequest = pending.poll()) != null) {
if (leaseRequest.isDone()) {
@@ -498,18 +510,24 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
if (deadline.isExpired()) {
leaseRequest.failed(DeadlineTimeoutException.from(deadline));
} else {
- final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
- if (availableEntry != null) {
- addLeased(availableEntry);
- leaseRequest.completed(availableEntry);
- } else if (allocatePoolEntry()) {
- final PoolEntry<T, C> newEntry = new PoolEntry<>(route, timeToLive);
- addLeased(newEntry);
- leaseRequest.completed(newEntry);
- } else {
+ final long releaseState = releaseSeqNum.get();
+ PoolEntry<T, C> entry = getAvailableEntry(state);
+ if (entry == null) {
+ entry = createPoolEntry();
+ }
+ if (entry != null) {
+ addLeased(entry);
+ leaseRequest.completed(entry);
+ if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
+ break;
+ }
+ }
+ else {
pending.addFirst(leaseRequest);
+ if (releaseState == releaseSeqNum.get()) {
+ break;
+ }
}
- break;
}
}
}
@@ -572,7 +590,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
}
}
}
- servicePendingRequest();
+ releaseSeqNum.incrementAndGet();
+ servicePendingRequests(RequestServiceStrategy.ALL);
}
public void enumLeased(final Callback<PoolEntry<T, C>> callback) {