You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/11/22 19:46:29 UTC

svn commit: r1770878 - in /httpcomponents/httpcore/branches/4.4.x/httpcore/src: main/java/org/apache/http/pool/ test/java/org/apache/http/pool/

Author: olegk
Date: Tue Nov 22 19:46:29 2016
New Revision: 1770878

URL: http://svn.apache.org/viewvc?rev=1770878&view=rev
Log:
HTTPCORE-433: redesign of connection request future used by blocking AbstractConnPool

Removed:
    httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java
Modified:
    httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
    httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java
    httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java

Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java Tue Nov 22 19:46:29 2016
@@ -34,14 +34,16 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.http.annotation.ThreadingBehavior;
 import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.util.Args;
 import org.apache.http.util.Asserts;
@@ -66,11 +68,12 @@ public abstract class AbstractConnPool<T
                                                implements ConnPool<T, E>, ConnPoolControl<T> {
 
     private final Lock lock;
+    private final Condition condition;
     private final ConnFactory<T, C> connFactory;
     private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
     private final Set<E> leased;
     private final LinkedList<E> available;
-    private final LinkedList<PoolEntryFuture<E>> pending;
+    private final LinkedList<Future<E>> pending;
     private final Map<T, Integer> maxPerRoute;
 
     private volatile boolean isShutDown;
@@ -87,10 +90,11 @@ public abstract class AbstractConnPool<T
         this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
         this.maxTotal = Args.positive(maxTotal, "Max total value");
         this.lock = new ReentrantLock();
+        this.condition = this.lock.newCondition();
         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
-        this.pending = new LinkedList<PoolEntryFuture<E>>();
+        this.pending = new LinkedList<Future<E>>();
         this.maxPerRoute = new HashMap<T, Integer>();
     }
 
@@ -183,16 +187,77 @@ public abstract class AbstractConnPool<T
     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
         Args.notNull(route, "Route");
         Asserts.check(!this.isShutDown, "Connection pool shut down");
-        return new PoolEntryFuture<E>(this.lock, callback) {
+
+        return new Future<E>() {
+
+            private volatile boolean cancelled;
+            private volatile boolean done;
+            private volatile E entry;
+
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                cancelled = true;
+                lock.lock();
+                try {
+                    condition.signalAll();
+                } finally {
+                    lock.unlock();
+                }
+                synchronized (this) {
+                    final boolean result = !done;
+                    done = true;
+                    if (callback != null) {
+                        callback.cancelled();
+                    }
+                    return result;
+                }
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return cancelled;
+            }
+
+            @Override
+            public boolean isDone() {
+                return done;
+            }
 
             @Override
-            public E getPoolEntry(
-                    final long timeout,
-                    final TimeUnit tunit)
-                        throws InterruptedException, TimeoutException, IOException {
-                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
-                onLease(entry);
-                return entry;
+            public E get() throws InterruptedException, ExecutionException {
+                try {
+                    return get(0L, TimeUnit.MILLISECONDS);
+                } catch (TimeoutException ex) {
+                    throw new ExecutionException(ex);
+                }
+            }
+
+            @Override
+            public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
+                final E local = entry;
+                if (local != null) {
+                    return local;
+                }
+                synchronized (this) {
+                    try {
+                        if (entry != null) {
+                            return entry;
+                        }
+                        entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
+                        done = true;
+                        onLease(entry);
+                        if (callback != null) {
+                            callback.completed(entry);
+                        }
+                        return entry;
+                    } catch (IOException ex) {
+                        done = true;
+                        if (callback != null) {
+                            callback.failed(ex);
+                        }
+                        throw new ExecutionException(ex);
+                    }
+                }
             }
 
         };
@@ -221,8 +286,7 @@ public abstract class AbstractConnPool<T
     private E getPoolEntryBlocking(
             final T route, final Object state,
             final long timeout, final TimeUnit tunit,
-            final PoolEntryFuture<E> future)
-                throws IOException, InterruptedException, TimeoutException {
+            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
 
         Date deadline = null;
         if (timeout > 0) {
@@ -302,9 +366,20 @@ public abstract class AbstractConnPool<T
 
                 boolean success = false;
                 try {
+                    if (future.isCancelled()) {
+                        throw new InterruptedException("Operation interrupted");
+                    }
                     pool.queue(future);
                     this.pending.add(future);
-                    success = future.await(deadline);
+                    if (deadline != null) {
+                        success = this.condition.awaitUntil(deadline);
+                    } else {
+                        this.condition.await();
+                        success = true;
+                    }
+                    if (future.isCancelled()) {
+                        throw new InterruptedException("Operation interrupted");
+                    }
                 } finally {
                     // In case of 'success', we were woken up by the
                     // connection pool and should now have a connection
@@ -338,14 +413,14 @@ public abstract class AbstractConnPool<T
                     entry.close();
                 }
                 onRelease(entry);
-                PoolEntryFuture<E> future = pool.nextPending();
+                Future<E> future = pool.nextPending();
                 if (future != null) {
                     this.pending.remove(future);
                 } else {
                     future = this.pending.poll();
                 }
                 if (future != null) {
-                    future.wakeup();
+                    this.condition.signalAll();
                 }
             }
         } finally {

Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/main/java/org/apache/http/pool/RouteSpecificPool.java Tue Nov 22 19:46:29 2016
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 import org.apache.http.util.Args;
 import org.apache.http.util.Asserts;
@@ -39,14 +40,14 @@ abstract class RouteSpecificPool<T, C, E
     private final T route;
     private final Set<E> leased;
     private final LinkedList<E> available;
-    private final LinkedList<PoolEntryFuture<E>> pending;
+    private final LinkedList<Future<E>> pending;
 
     RouteSpecificPool(final T route) {
         super();
         this.route = route;
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
-        this.pending = new LinkedList<PoolEntryFuture<E>>();
+        this.pending = new LinkedList<Future<E>>();
     }
 
     protected abstract E createEntry(C conn);
@@ -130,18 +131,18 @@ abstract class RouteSpecificPool<T, C, E
         return entry;
     }
 
-    public void queue(final PoolEntryFuture<E> future) {
+    public void queue(final Future<E> future) {
         if (future == null) {
             return;
         }
         this.pending.add(future);
     }
 
-    public PoolEntryFuture<E> nextPending() {
+    public Future<E> nextPending() {
         return this.pending.poll();
     }
 
-    public void unqueue(final PoolEntryFuture<E> future) {
+    public void unqueue(final Future<E> future) {
         if (future == null) {
             return;
         }
@@ -150,7 +151,7 @@ abstract class RouteSpecificPool<T, C, E
     }
 
     public void shutdown() {
-        for (final PoolEntryFuture<E> future: this.pending) {
+        for (final Future<E> future: this.pending) {
             future.cancel(true);
         }
         this.pending.clear();

Modified: httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java?rev=1770878&r1=1770877&r2=1770878&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java (original)
+++ httpcomponents/httpcore/branches/4.4.x/httpcore/src/test/java/org/apache/http/pool/TestRouteSpecificPool.java Tue Nov 22 19:46:29 2016
@@ -27,6 +27,7 @@
 package org.apache.http.pool;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 
 import org.apache.http.HttpConnection;
 import org.junit.Assert;
@@ -278,9 +279,9 @@ public class TestRouteSpecificPool {
     public void testWaitingThreadQueuing() throws Exception {
         final LocalRoutePool pool = new LocalRoutePool();
         @SuppressWarnings("unchecked")
-        final PoolEntryFuture<LocalPoolEntry> future1 = Mockito.mock(PoolEntryFuture.class);
+        final Future<LocalPoolEntry> future1 = Mockito.mock(Future.class);
         @SuppressWarnings("unchecked")
-        final PoolEntryFuture<LocalPoolEntry> future2 = Mockito.mock(PoolEntryFuture.class);
+        final Future<LocalPoolEntry> future2 = Mockito.mock(Future.class);
 
         Assert.assertEquals(0, pool.getPendingCount());
         pool.queue(future1);
@@ -308,7 +309,7 @@ public class TestRouteSpecificPool {
         final LocalPoolEntry entry2 = pool.add(conn2);
 
         @SuppressWarnings("unchecked")
-        final PoolEntryFuture<LocalPoolEntry> future1 = Mockito.mock(PoolEntryFuture.class);
+        final Future<LocalPoolEntry> future1 = Mockito.mock(Future.class);
         pool.queue(future1);
 
         Assert.assertNotNull(entry1);