You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/08/16 07:31:31 UTC

[httpcomponents-core] branch master updated: HTTPCORE-589 Fix LaxConnPool to ensure available connections are assigned to pending requests whenever possible.

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 68a1d78  HTTPCORE-589 Fix LaxConnPool to ensure available connections are assigned to pending requests whenever possible.
68a1d78 is described below

commit 68a1d783314de5e53291f3f7361125089d6044e6
Author: Linton Miller <li...@coxautoinc.com>
AuthorDate: Mon Aug 12 05:51:15 2019 +0100

    HTTPCORE-589 Fix LaxConnPool to ensure available connections are
    assigned to pending requests whenever possible.
---
 .../java/org/apache/hc/core5/pool/LaxConnPool.java | 65 ++++++++++++++--------
 1 file changed, 42 insertions(+), 23 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
index 998896d..56e7e88 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicMarkableReference;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.Experimental;
@@ -341,6 +342,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
 
     static class PerRoutePool<T, C extends ModalCloseable> {
 
+        private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
+
         private final T route;
         private final TimeValue timeToLive;
         private final PoolReusePolicy policy;
@@ -351,6 +354,7 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
         private final Deque<LeaseRequest<T, C>> pending;
         private final AtomicBoolean terminated;
         private final AtomicInteger allocated;
+        private final AtomicLong releaseSeqNum;
 
         private volatile int max;
 
@@ -372,6 +376,7 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
             this.pending = new ConcurrentLinkedDeque<>();
             this.terminated = new AtomicBoolean(false);
             this.allocated = new AtomicInteger(0);
+            this.releaseSeqNum = new AtomicLong(0);
             this.max = max;
         }
 
@@ -392,14 +397,14 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
             }
         }
 
-        private boolean allocatePoolEntry() {
+        private PoolEntry<T, C> createPoolEntry() {
             final int poolmax = max;
             int prev, next;
             do {
                 prev = allocated.get();
                 next = (prev<poolmax)? prev+1 : prev;
             } while (!allocated.compareAndSet(prev, next));
-            return prev < next;
+            return (prev < next)? new PoolEntry<T,C>(route, timeToLive) : null;
         }
 
         private void deallocatePoolEntry() {
@@ -447,17 +452,18 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
                 final FutureCallback<PoolEntry<T, C>> callback) {
             Asserts.check(!terminated.get(), "Connection pool shut down");
             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
-            final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
-            if (availableEntry != null) {
-                addLeased(availableEntry);
-                future.completed(availableEntry);
+            final long releaseState = releaseSeqNum.get();
+            PoolEntry<T, C> entry = getAvailableEntry(state);
+            if (entry == null && pending.isEmpty()) {
+                entry = createPoolEntry();
+            }
+            if (entry != null) {
+                addLeased(entry);
+                future.completed(entry);
             } else {
-                if (pending.isEmpty() && allocatePoolEntry()) {
-                    final PoolEntry<T, C> entry = new PoolEntry<>(route, timeToLive);
-                    addLeased(entry);
-                    future.completed(entry);
-                } else {
-                    pending.add(new LeaseRequest<>(state, requestTimeout, future));
+                pending.add(new LeaseRequest<>(state, requestTimeout, future));
+                if (releaseState != releaseSeqNum.get()) {
+                    servicePendingRequest();
                 }
             }
             return future;
@@ -483,10 +489,16 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
             else {
                 deallocatePoolEntry();
             }
+            releaseSeqNum.incrementAndGet();
             servicePendingRequest();
         }
 
+
         private void servicePendingRequest() {
+            servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
+        }
+
+        private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
             LeaseRequest<T, C> leaseRequest;
             while ((leaseRequest = pending.poll()) != null) {
                 if (leaseRequest.isDone()) {
@@ -498,18 +510,24 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
                 if (deadline.isExpired()) {
                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
                 } else {
-                    final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
-                    if (availableEntry != null) {
-                        addLeased(availableEntry);
-                        leaseRequest.completed(availableEntry);
-                    } else if (allocatePoolEntry()) {
-                        final PoolEntry<T, C> newEntry = new PoolEntry<>(route, timeToLive);
-                        addLeased(newEntry);
-                        leaseRequest.completed(newEntry);
-                    } else {
+                    final long releaseState = releaseSeqNum.get();
+                    PoolEntry<T, C> entry = getAvailableEntry(state);
+                    if (entry == null) {
+                        entry = createPoolEntry();
+                    }
+                    if (entry != null) {
+                        addLeased(entry);
+                        leaseRequest.completed(entry);
+                        if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
+                            break;
+                        }
+                    }
+                    else {
                         pending.addFirst(leaseRequest);
+                        if (releaseState == releaseSeqNum.get()) {
+                            break;
+                        }
                     }
-                    break;
                 }
             }
         }
@@ -572,7 +590,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
                     }
                 }
             }
-            servicePendingRequest();
+            releaseSeqNum.incrementAndGet();
+            servicePendingRequests(RequestServiceStrategy.ALL);
         }
 
         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {