You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2013/06/15 18:21:06 UTC

svn commit: r1493390 - in /httpcomponents/httpcore/trunk: RELEASE_NOTES.txt httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java

Author: olegk
Date: Sat Jun 15 16:21:06 2013
New Revision: 1493390

URL: http://svn.apache.org/r1493390
Log:
HTTPCORE-343: AbstractNIOConnPool to fire request callbacks outside the pool lock; this makes it possible to re-enter pool methods from a callback event

Modified:
    httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java

Modified: httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/RELEASE_NOTES.txt?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/RELEASE_NOTES.txt (original)
+++ httpcomponents/httpcore/trunk/RELEASE_NOTES.txt Sat Jun 15 16:21:06 2013
@@ -1,5 +1,8 @@
 Changes since Release 4.3-BETA2 
 -------------------
+* [HTTPCORE-343] AbstractNIOConnPool to fire request callbacks outside the pool lock.
+  This makes it possible to re-enter pool methods from a callback event.
+  Contributed by Oleg Kalnichevski <olegk at apache.org>
 
 * [HTTPCORE-340] AbstractNIOConnPool to support lease timeout distinct from connect timeout.  
   Contributed by Ignat Alexeyenko <ignatalexeyenko at gmail.com>

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Sat Jun 15 16:21:06 2013
@@ -35,9 +35,11 @@ import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -78,10 +80,11 @@ public abstract class AbstractNIOConnPoo
     private final Set<SessionRequest> pending;
     private final Set<E> leased;
     private final LinkedList<E> available;
+    private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
     private final Map<T, Integer> maxPerRoute;
     private final Lock lock;
+    private final AtomicBoolean isShutDown;
 
-    private volatile boolean isShutDown;
     private volatile int defaultMaxPerRoute;
     private volatile int maxTotal;
 
@@ -120,7 +123,9 @@ public abstract class AbstractNIOConnPoo
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
         this.maxPerRoute = new HashMap<T, Integer>();
+        this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
         this.lock = new ReentrantLock();
+        this.isShutDown = new AtomicBoolean(false);
         this.defaultMaxPerRoute = defaultMaxPerRoute;
         this.maxTotal = maxTotal;
     }
@@ -149,8 +154,10 @@ public abstract class AbstractNIOConnPoo
         this.pending = new HashSet<SessionRequest>();
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
+        this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
         this.maxPerRoute = new HashMap<T, Integer>();
         this.lock = new ReentrantLock();
+        this.isShutDown = new AtomicBoolean(false);
         this.defaultMaxPerRoute = defaultMaxPerRoute;
         this.maxTotal = maxTotal;
     }
@@ -174,36 +181,35 @@ public abstract class AbstractNIOConnPoo
     protected abstract E createEntry(T route, C conn);
 
     public boolean isShutdown() {
-        return this.isShutDown;
+        return this.isShutDown.get();
     }
 
     public void shutdown(final long waitMs) throws IOException {
-        if (this.isShutDown) {
-            return ;
-        }
-        this.isShutDown = true;
-        this.lock.lock();
-        try {
-            for (final SessionRequest sessionRequest: this.pending) {
-                sessionRequest.cancel();
-            }
-            for (final E entry: this.available) {
-                entry.close();
-            }
-            for (final E entry: this.leased) {
-                entry.close();
-            }
-            for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
-                pool.shutdown();
+        if (this.isShutDown.compareAndSet(false, true)) {
+            fireCallbacks();
+            this.lock.lock();
+            try {
+                for (final SessionRequest sessionRequest: this.pending) {
+                    sessionRequest.cancel();
+                }
+                for (final E entry: this.available) {
+                    entry.close();
+                }
+                for (final E entry: this.leased) {
+                    entry.close();
+                }
+                for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
+                    pool.shutdown();
+                }
+                this.routeToPool.clear();
+                this.leased.clear();
+                this.pending.clear();
+                this.available.clear();
+                this.leasingRequests.clear();
+                this.ioreactor.shutdown(waitMs);
+            } finally {
+                this.lock.unlock();
             }
-            this.routeToPool.clear();
-            this.leased.clear();
-            this.pending.clear();
-            this.available.clear();
-            this.leasingRequests.clear();
-            this.ioreactor.shutdown(waitMs);
-        } finally {
-            this.lock.unlock();
         }
     }
 
@@ -239,20 +245,24 @@ public abstract class AbstractNIOConnPoo
             final FutureCallback<E> callback) {
         Args.notNull(route, "Route");
         Args.notNull(tunit, "Time unit");
-        Asserts.check(!this.isShutDown, "Connection pool shut down");
+        Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
+        final BasicFuture<E> future = new BasicFuture<E>(callback);
         this.lock.lock();
         try {
             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
-            final BasicFuture<E> future = new BasicFuture<E>(callback);
             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
             final boolean completed = processPendingRequest(request);
             if (!request.isDone() && !completed) {
                 this.leasingRequests.add(request);
             }
-            return future;
+            if (request.isDone()) {
+                this.completedRequests.add(request);
+            }
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
+        return future;
     }
 
     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
@@ -267,7 +277,7 @@ public abstract class AbstractNIOConnPoo
         if (entry == null) {
             return;
         }
-        if (this.isShutDown) {
+        if (this.isShutDown.get()) {
             return;
         }
         this.lock.lock();
@@ -285,16 +295,20 @@ public abstract class AbstractNIOConnPoo
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     private void processPendingRequests() {
         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
         while (it.hasNext()) {
             final LeaseRequest<T, C, E> request = it.next();
-            processPendingRequest(request);
-            if (request.isDone()) {
+            final boolean completed = processPendingRequest(request);
+            if (request.isDone() || completed) {
                 it.remove();
             }
+            if (request.isDone()) {
+                this.completedRequests.add(request);
+            }
         }
     }
 
@@ -306,6 +320,9 @@ public abstract class AbstractNIOConnPoo
             if (request.isDone() || completed) {
                 it.remove();
             }
+            if (request.isDone()) {
+                this.completedRequests.add(request);
+            }
             if (completed) {
                 return;
             }
@@ -316,11 +333,10 @@ public abstract class AbstractNIOConnPoo
         final T route = request.getRoute();
         final Object state = request.getState();
         final long deadline = request.getDeadline();
-        final BasicFuture<E> future = request.getFuture();
 
         final long now = System.currentTimeMillis();
         if (now > deadline) {
-            future.failed(new TimeoutException());
+            request.failed(new TimeoutException());
             return false;
         }
 
@@ -342,7 +358,7 @@ public abstract class AbstractNIOConnPoo
         if (entry != null) {
             this.available.remove(entry);
             this.leased.add(entry);
-            future.completed(entry);
+            request.completed(entry);
             return true;
         }
 
@@ -384,7 +400,7 @@ public abstract class AbstractNIOConnPoo
                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
                 localAddress = this.addressResolver.resolveLocalAddress(route);
             } catch (final IOException ex) {
-                future.failed(ex);
+                request.failed(ex);
                 return false;
             }
 
@@ -394,13 +410,29 @@ public abstract class AbstractNIOConnPoo
                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
             sessionRequest.setConnectTimeout(timout);
             this.pending.add(sessionRequest);
-            pool.addPending(sessionRequest, future);
+            pool.addPending(sessionRequest, request.getFuture());
             return true;
         } else {
             return false;
         }
     }
 
+    private void fireCallbacks() {
+        LeaseRequest<T, C, E> request;
+        while ((request = this.completedRequests.poll()) != null) {
+            final BasicFuture<E> future = request.getFuture();
+            final Exception ex = request.getException();
+            final E result = request.getResult();
+            if (ex != null) {
+                future.failed(ex);
+            } else if (result != null) {
+                future.completed(result);
+            } else {
+                future.cancel();
+            }
+        }
+    }
+
     public void validatePendingRequests() {
         this.lock.lock();
         try {
@@ -411,17 +443,18 @@ public abstract class AbstractNIOConnPoo
                 final long deadline = request.getDeadline();
                 if (now > deadline) {
                     it.remove();
-                    final BasicFuture<E> future = request.getFuture();
-                    future.failed(new TimeoutException());
+                    request.failed(new TimeoutException());
+                    this.completedRequests.add(request);
                 }
             }
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     protected void requestCompleted(final SessionRequest request) {
-        if (this.isShutDown) {
+        if (this.isShutDown.get()) {
             return;
         }
         @SuppressWarnings("unchecked")
@@ -444,10 +477,11 @@ public abstract class AbstractNIOConnPoo
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     protected void requestCancelled(final SessionRequest request) {
-        if (this.isShutDown) {
+        if (this.isShutDown.get()) {
             return;
         }
         @SuppressWarnings("unchecked")
@@ -462,10 +496,11 @@ public abstract class AbstractNIOConnPoo
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     protected void requestFailed(final SessionRequest request) {
-        if (this.isShutDown) {
+        if (this.isShutDown.get()) {
             return;
         }
         @SuppressWarnings("unchecked")
@@ -480,10 +515,11 @@ public abstract class AbstractNIOConnPoo
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     protected void requestTimeout(final SessionRequest request) {
-        if (this.isShutDown) {
+        if (this.isShutDown.get()) {
             return;
         }
         @SuppressWarnings("unchecked")
@@ -498,6 +534,7 @@ public abstract class AbstractNIOConnPoo
         } finally {
             this.lock.unlock();
         }
+        fireCallbacks();
     }
 
     private int getMax(final T route) {

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java Sat Jun 15 16:21:06 2013
@@ -30,6 +30,8 @@ import org.apache.http.annotation.Immuta
 import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.pool.PoolEntry;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 @Immutable
 class LeaseRequest<T, C, E extends PoolEntry<T, C>> {
 
@@ -38,6 +40,9 @@ class LeaseRequest<T, C, E extends PoolE
     private final long connectTimeout;
     private final long deadline;
     private final BasicFuture<E> future;
+    private final AtomicBoolean completed;
+    private volatile E result;
+    private volatile Exception ex;
 
     /**
      * Contructor
@@ -60,6 +65,7 @@ class LeaseRequest<T, C, E extends PoolE
         this.deadline = leaseTimeout > 0 ? System.currentTimeMillis() + leaseTimeout :
                 Long.MAX_VALUE;
         this.future = future;
+        this.completed = new AtomicBoolean(false);
     }
 
     public T getRoute() {
@@ -78,12 +84,32 @@ class LeaseRequest<T, C, E extends PoolE
         return this.deadline;
     }
 
+    public boolean isDone() {
+        return this.completed.get();
+    }
+
+    public void failed(final Exception ex) {
+        if (this.completed.compareAndSet(false, true)) {
+            this.ex = ex;
+        }
+    }
+
+    public void completed(final E result) {
+        if (this.completed.compareAndSet(false, true)) {
+            this.result = result;
+        }
+    }
+
     public BasicFuture<E> getFuture() {
         return this.future;
     }
 
-    public boolean isDone() {
-        return this.future.isDone();
+    public E getResult() {
+        return this.result;
+    }
+
+    public Exception getException() {
+        return this.ex;
     }
 
     @Override