You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/11/22 19:46:29 UTC
svn commit: r1770878 - in
/httpcomponents/httpcore/branches/4.4.x/httpcore/src:
main/java/org/apache/http/pool/ test/java/org/apache/http/pool/
Author: olegk
Date: Tue Nov 22 19:46:29 2016
New Revision: 1770878
URL: http://svn.apache.org/viewvc?rev=1770878&view=rev
Log:
HTTPCORE-433: redesign of connection request future used by blocking AbstractConnPool
Removed:
httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java
Modified:
httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java
httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java
Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java Tue Nov 22 19:46:29 2016
@@ -34,14 +34,16 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.http.annotation.ThreadingBehavior;
import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.util.Args;
import org.apache.http.util.Asserts;
@@ -66,11 +68,12 @@ public abstract class AbstractConnPool<T
implements ConnPool<T, E>, ConnPoolControl<T> {
private final Lock lock;
+ private final Condition condition;
private final ConnFactory<T, C> connFactory;
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
private final Set<E> leased;
private final LinkedList<E> available;
- private final LinkedList<PoolEntryFuture<E>> pending;
+ private final LinkedList<Future<E>> pending;
private final Map<T, Integer> maxPerRoute;
private volatile boolean isShutDown;
@@ -87,10 +90,11 @@ public abstract class AbstractConnPool<T
this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
this.maxTotal = Args.positive(maxTotal, "Max total value");
this.lock = new ReentrantLock();
+ this.condition = this.lock.newCondition();
this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
- this.pending = new LinkedList<PoolEntryFuture<E>>();
+ this.pending = new LinkedList<Future<E>>();
this.maxPerRoute = new HashMap<T, Integer>();
}
@@ -183,16 +187,77 @@ public abstract class AbstractConnPool<T
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
- return new PoolEntryFuture<E>(this.lock, callback) {
+
+ return new Future<E>() {
+
+ private volatile boolean cancelled;
+ private volatile boolean done;
+ private volatile E entry;
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ cancelled = true;
+ lock.lock();
+ try {
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ synchronized (this) {
+ final boolean result = !done;
+ done = true;
+ if (callback != null) {
+ callback.cancelled();
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
@Override
- public E getPoolEntry(
- final long timeout,
- final TimeUnit tunit)
- throws InterruptedException, TimeoutException, IOException {
- final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
- onLease(entry);
- return entry;
+ public E get() throws InterruptedException, ExecutionException {
+ try {
+ return get(0L, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException ex) {
+ throw new ExecutionException(ex);
+ }
+ }
+
+ @Override
+ public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
+ final E local = entry;
+ if (local != null) {
+ return local;
+ }
+ synchronized (this) {
+ try {
+ if (entry != null) {
+ return entry;
+ }
+ entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
+ done = true;
+ onLease(entry);
+ if (callback != null) {
+ callback.completed(entry);
+ }
+ return entry;
+ } catch (IOException ex) {
+ done = true;
+ if (callback != null) {
+ callback.failed(ex);
+ }
+ throw new ExecutionException(ex);
+ }
+ }
}
};
@@ -221,8 +286,7 @@ public abstract class AbstractConnPool<T
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
- final PoolEntryFuture<E> future)
- throws IOException, InterruptedException, TimeoutException {
+ final Future<E> future) throws IOException, InterruptedException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
@@ -302,9 +366,20 @@ public abstract class AbstractConnPool<T
boolean success = false;
try {
+ if (future.isCancelled()) {
+ throw new InterruptedException("Operation interrupted");
+ }
pool.queue(future);
this.pending.add(future);
- success = future.await(deadline);
+ if (deadline != null) {
+ success = this.condition.awaitUntil(deadline);
+ } else {
+ this.condition.await();
+ success = true;
+ }
+ if (future.isCancelled()) {
+ throw new InterruptedException("Operation interrupted");
+ }
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
@@ -338,14 +413,14 @@ public abstract class AbstractConnPool<T
entry.close();
}
onRelease(entry);
- PoolEntryFuture<E> future = pool.nextPending();
+ Future<E> future = pool.nextPending();
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
- future.wakeup();
+ this.condition.signalAll();
}
}
} finally {
Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java Tue Nov 22 19:46:29 2016
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.Future;
import org.apache.http.util.Args;
import org.apache.http.util.Asserts;
@@ -39,14 +40,14 @@ abstract class RouteSpecificPool<T, C, E
private final T route;
private final Set<E> leased;
private final LinkedList<E> available;
- private final LinkedList<PoolEntryFuture<E>> pending;
+ private final LinkedList<Future<E>> pending;
RouteSpecificPool(final T route) {
super();
this.route = route;
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
- this.pending = new LinkedList<PoolEntryFuture<E>>();
+ this.pending = new LinkedList<Future<E>>();
}
protected abstract E createEntry(C conn);
@@ -130,18 +131,18 @@ abstract class RouteSpecificPool<T, C, E
return entry;
}
- public void queue(final PoolEntryFuture<E> future) {
+ public void queue(final Future<E> future) {
if (future == null) {
return;
}
this.pending.add(future);
}
- public PoolEntryFuture<E> nextPending() {
+ public Future<E> nextPending() {
return this.pending.poll();
}
- public void unqueue(final PoolEntryFuture<E> future) {
+ public void unqueue(final Future<E> future) {
if (future == null) {
return;
}
@@ -150,7 +151,7 @@ abstract class RouteSpecificPool<T, C, E
}
public void shutdown() {
- for (final PoolEntryFuture<E> future: this.pending) {
+ for (final Future<E> future: this.pending) {
future.cancel(true);
}
this.pending.clear();
Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java Tue Nov 22 19:46:29 2016
@@ -27,6 +27,7 @@
package org.apache.http.pool;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.http.HttpConnection;
import org.junit.Assert;
@@ -278,9 +279,9 @@ public class TestRouteSpecificPool {
public void testWaitingThreadQueuing() throws Exception {
final LocalRoutePool pool = new LocalRoutePool();
@SuppressWarnings("unchecked")
- final PoolEntryFuture<LocalPoolEntry> future1 = Mockito.mock(PoolEntryFuture.class);
+ final Future<LocalPoolEntry> future1 = Mockito.mock(Future.class);
@SuppressWarnings("unchecked")
- final PoolEntryFuture<LocalPoolEntry> future2 = Mockito.mock(PoolEntryFuture.class);
+ final Future<LocalPoolEntry> future2 = Mockito.mock(Future.class);
Assert.assertEquals(0, pool.getPendingCount());
pool.queue(future1);
@@ -308,7 +309,7 @@ public class TestRouteSpecificPool {
final LocalPoolEntry entry2 = pool.add(conn2);
@SuppressWarnings("unchecked")
- final PoolEntryFuture<LocalPoolEntry> future1 = Mockito.mock(PoolEntryFuture.class);
+ final Future<LocalPoolEntry> future1 = Mockito.mock(Future.class);
pool.queue(future1);
Assert.assertNotNull(entry1);