You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2013/06/15 18:21:06 UTC
svn commit: r1493390 - in /httpcomponents/httpcore/trunk: RELEASE_NOTES.txt
httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
Author: olegk
Date: Sat Jun 15 16:21:06 2013
New Revision: 1493390
URL: http://svn.apache.org/r1493390
Log:
HTTPCORE-343: AbstractNIOConnPool to fire request callbacks outside the pool lock; this makes it possible to re-enter pool methods from a callback event
Modified:
httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
Modified: httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/RELEASE_NOTES.txt?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/RELEASE_NOTES.txt (original)
+++ httpcomponents/httpcore/trunk/RELEASE_NOTES.txt Sat Jun 15 16:21:06 2013
@@ -1,5 +1,8 @@
Changes since Release 4.3-BETA2
-------------------
+* [HTTPCORE-343] AbstractNIOConnPool to fire request callbacks outside the pool lock.
+ This makes it possible to re-enter pool methods from a callback event.
+ Contributed by Oleg Kalnichevski <olegk at apache.org>
* [HTTPCORE-340] AbstractNIOConnPool to support lease timeout distinct from connect timeout.
Contributed by Ignat Alexeyenko <ignatalexeyenko at gmail.com>
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Sat Jun 15 16:21:06 2013
@@ -35,9 +35,11 @@ import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -78,10 +80,11 @@ public abstract class AbstractNIOConnPoo
private final Set<SessionRequest> pending;
private final Set<E> leased;
private final LinkedList<E> available;
+ private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
private final Map<T, Integer> maxPerRoute;
private final Lock lock;
+ private final AtomicBoolean isShutDown;
- private volatile boolean isShutDown;
private volatile int defaultMaxPerRoute;
private volatile int maxTotal;
@@ -120,7 +123,9 @@ public abstract class AbstractNIOConnPoo
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
this.maxPerRoute = new HashMap<T, Integer>();
+ this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
this.lock = new ReentrantLock();
+ this.isShutDown = new AtomicBoolean(false);
this.defaultMaxPerRoute = defaultMaxPerRoute;
this.maxTotal = maxTotal;
}
@@ -149,8 +154,10 @@ public abstract class AbstractNIOConnPoo
this.pending = new HashSet<SessionRequest>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
+ this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
this.maxPerRoute = new HashMap<T, Integer>();
this.lock = new ReentrantLock();
+ this.isShutDown = new AtomicBoolean(false);
this.defaultMaxPerRoute = defaultMaxPerRoute;
this.maxTotal = maxTotal;
}
@@ -174,36 +181,35 @@ public abstract class AbstractNIOConnPoo
protected abstract E createEntry(T route, C conn);
public boolean isShutdown() {
- return this.isShutDown;
+ return this.isShutDown.get();
}
public void shutdown(final long waitMs) throws IOException {
- if (this.isShutDown) {
- return ;
- }
- this.isShutDown = true;
- this.lock.lock();
- try {
- for (final SessionRequest sessionRequest: this.pending) {
- sessionRequest.cancel();
- }
- for (final E entry: this.available) {
- entry.close();
- }
- for (final E entry: this.leased) {
- entry.close();
- }
- for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
- pool.shutdown();
+ if (this.isShutDown.compareAndSet(false, true)) {
+ fireCallbacks();
+ this.lock.lock();
+ try {
+ for (final SessionRequest sessionRequest: this.pending) {
+ sessionRequest.cancel();
+ }
+ for (final E entry: this.available) {
+ entry.close();
+ }
+ for (final E entry: this.leased) {
+ entry.close();
+ }
+ for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
+ pool.shutdown();
+ }
+ this.routeToPool.clear();
+ this.leased.clear();
+ this.pending.clear();
+ this.available.clear();
+ this.leasingRequests.clear();
+ this.ioreactor.shutdown(waitMs);
+ } finally {
+ this.lock.unlock();
}
- this.routeToPool.clear();
- this.leased.clear();
- this.pending.clear();
- this.available.clear();
- this.leasingRequests.clear();
- this.ioreactor.shutdown(waitMs);
- } finally {
- this.lock.unlock();
}
}
@@ -239,20 +245,24 @@ public abstract class AbstractNIOConnPoo
final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Args.notNull(tunit, "Time unit");
- Asserts.check(!this.isShutDown, "Connection pool shut down");
+ Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
+ final BasicFuture<E> future = new BasicFuture<E>(callback);
this.lock.lock();
try {
final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
- final BasicFuture<E> future = new BasicFuture<E>(callback);
final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
final boolean completed = processPendingRequest(request);
if (!request.isDone() && !completed) {
this.leasingRequests.add(request);
}
- return future;
+ if (request.isDone()) {
+ this.completedRequests.add(request);
+ }
} finally {
this.lock.unlock();
}
+ fireCallbacks();
+ return future;
}
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
@@ -267,7 +277,7 @@ public abstract class AbstractNIOConnPoo
if (entry == null) {
return;
}
- if (this.isShutDown) {
+ if (this.isShutDown.get()) {
return;
}
this.lock.lock();
@@ -285,16 +295,20 @@ public abstract class AbstractNIOConnPoo
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
private void processPendingRequests() {
final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
while (it.hasNext()) {
final LeaseRequest<T, C, E> request = it.next();
- processPendingRequest(request);
- if (request.isDone()) {
+ final boolean completed = processPendingRequest(request);
+ if (request.isDone() || completed) {
it.remove();
}
+ if (request.isDone()) {
+ this.completedRequests.add(request);
+ }
}
}
@@ -306,6 +320,9 @@ public abstract class AbstractNIOConnPoo
if (request.isDone() || completed) {
it.remove();
}
+ if (request.isDone()) {
+ this.completedRequests.add(request);
+ }
if (completed) {
return;
}
@@ -316,11 +333,10 @@ public abstract class AbstractNIOConnPoo
final T route = request.getRoute();
final Object state = request.getState();
final long deadline = request.getDeadline();
- final BasicFuture<E> future = request.getFuture();
final long now = System.currentTimeMillis();
if (now > deadline) {
- future.failed(new TimeoutException());
+ request.failed(new TimeoutException());
return false;
}
@@ -342,7 +358,7 @@ public abstract class AbstractNIOConnPoo
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
- future.completed(entry);
+ request.completed(entry);
return true;
}
@@ -384,7 +400,7 @@ public abstract class AbstractNIOConnPoo
remoteAddress = this.addressResolver.resolveRemoteAddress(route);
localAddress = this.addressResolver.resolveLocalAddress(route);
} catch (final IOException ex) {
- future.failed(ex);
+ request.failed(ex);
return false;
}
@@ -394,13 +410,29 @@ public abstract class AbstractNIOConnPoo
(int) request.getConnectTimeout() : Integer.MAX_VALUE;
sessionRequest.setConnectTimeout(timout);
this.pending.add(sessionRequest);
- pool.addPending(sessionRequest, future);
+ pool.addPending(sessionRequest, request.getFuture());
return true;
} else {
return false;
}
}
+ private void fireCallbacks() {
+ LeaseRequest<T, C, E> request;
+ while ((request = this.completedRequests.poll()) != null) {
+ final BasicFuture<E> future = request.getFuture();
+ final Exception ex = request.getException();
+ final E result = request.getResult();
+ if (ex != null) {
+ future.failed(ex);
+ } else if (result != null) {
+ future.completed(result);
+ } else {
+ future.cancel();
+ }
+ }
+ }
+
public void validatePendingRequests() {
this.lock.lock();
try {
@@ -411,17 +443,18 @@ public abstract class AbstractNIOConnPoo
final long deadline = request.getDeadline();
if (now > deadline) {
it.remove();
- final BasicFuture<E> future = request.getFuture();
- future.failed(new TimeoutException());
+ request.failed(new TimeoutException());
+ this.completedRequests.add(request);
}
}
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
protected void requestCompleted(final SessionRequest request) {
- if (this.isShutDown) {
+ if (this.isShutDown.get()) {
return;
}
@SuppressWarnings("unchecked")
@@ -444,10 +477,11 @@ public abstract class AbstractNIOConnPoo
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
protected void requestCancelled(final SessionRequest request) {
- if (this.isShutDown) {
+ if (this.isShutDown.get()) {
return;
}
@SuppressWarnings("unchecked")
@@ -462,10 +496,11 @@ public abstract class AbstractNIOConnPoo
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
protected void requestFailed(final SessionRequest request) {
- if (this.isShutDown) {
+ if (this.isShutDown.get()) {
return;
}
@SuppressWarnings("unchecked")
@@ -480,10 +515,11 @@ public abstract class AbstractNIOConnPoo
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
protected void requestTimeout(final SessionRequest request) {
- if (this.isShutDown) {
+ if (this.isShutDown.get()) {
return;
}
@SuppressWarnings("unchecked")
@@ -498,6 +534,7 @@ public abstract class AbstractNIOConnPoo
} finally {
this.lock.unlock();
}
+ fireCallbacks();
}
private int getMax(final T route) {
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java?rev=1493390&r1=1493389&r2=1493390&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java Sat Jun 15 16:21:06 2013
@@ -30,6 +30,8 @@ import org.apache.http.annotation.Immuta
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.pool.PoolEntry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
@Immutable
class LeaseRequest<T, C, E extends PoolEntry<T, C>> {
@@ -38,6 +40,9 @@ class LeaseRequest<T, C, E extends PoolE
private final long connectTimeout;
private final long deadline;
private final BasicFuture<E> future;
+ private final AtomicBoolean completed;
+ private volatile E result;
+ private volatile Exception ex;
/**
* Contructor
@@ -60,6 +65,7 @@ class LeaseRequest<T, C, E extends PoolE
this.deadline = leaseTimeout > 0 ? System.currentTimeMillis() + leaseTimeout :
Long.MAX_VALUE;
this.future = future;
+ this.completed = new AtomicBoolean(false);
}
public T getRoute() {
@@ -78,12 +84,32 @@ class LeaseRequest<T, C, E extends PoolE
return this.deadline;
}
+ public boolean isDone() {
+ return this.completed.get();
+ }
+
+ public void failed(final Exception ex) {
+ if (this.completed.compareAndSet(false, true)) {
+ this.ex = ex;
+ }
+ }
+
+ public void completed(final E result) {
+ if (this.completed.compareAndSet(false, true)) {
+ this.result = result;
+ }
+ }
+
public BasicFuture<E> getFuture() {
return this.future;
}
- public boolean isDone() {
- return this.future.isDone();
+ public E getResult() {
+ return this.result;
+ }
+
+ public Exception getException() {
+ return this.ex;
}
@Override