You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by ma...@apache.org on 2009/05/18 00:58:26 UTC

svn commit: r775775 - /commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java

Author: markt
Date: Sun May 17 22:58:25 2009
New Revision: 775775

URL: http://svn.apache.org/viewvc?rev=775775&view=rev
Log:
Fix POOL-75 for GKOP. Objects are now allocated to threads in the order in which the threads made their request.

Modified:
    commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java

Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java
URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java?rev=775775&r1=775774&r2=775775&view=diff
==============================================================================
--- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java (original)
+++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java Sun May 17 22:58:25 2009
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -562,7 +563,7 @@
      */
     public synchronized void setMaxActive(int maxActive) {
         _maxActive = maxActive;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -589,7 +590,7 @@
      */
     public synchronized void setMaxTotal(int maxTotal) {
         _maxTotal = maxTotal;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -621,7 +622,7 @@
             case WHEN_EXHAUSTED_FAIL:
             case WHEN_EXHAUSTED_GROW:
                 _whenExhaustedAction = whenExhaustedAction;
-                notifyAll();
+                allocate();
                 break;
             default:
                 throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized.");
@@ -693,7 +694,7 @@
      */
     public synchronized void setMaxIdle(int maxIdle) {
         _maxIdle = maxIdle;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -944,51 +945,37 @@
 
     public Object borrowObject(Object key) throws Exception {
         long starttime = System.currentTimeMillis();
+        Latch latch = new Latch(key);
+        synchronized (this) {
+            _allocationQueue.add(latch);
+            allocate();
+        }
+        
         for(;;) {
-            ObjectTimestampPair pair = null;
-            ObjectQueue pool = null;
             synchronized (this) {
                 assertOpen();
-                pool = (ObjectQueue)(_poolMap.get(key));
-                if(null == pool) {
-                    pool = new ObjectQueue();
-                    _poolMap.put(key,pool);
-                    _poolList.add(key);
-                }
-                // if there are any sleeping, just grab one of those
-                try {
-                    pair = (ObjectTimestampPair)(pool.queue.removeFirst());
-                    if(null != pair) {
-                        _totalIdle--;
-                    }
-                } catch(NoSuchElementException e) { /* ignored */
-                }
-                // otherwise
-                if(null == pair) {
-                    // if there is a totalMaxActive and we are at the limit then
-                    // we have to make room
-                    if ((_maxTotal > 0)
-                            && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) {
-                        clearOldest();
-                    }
-    
-                    // check if we can create one
-                    // (note we know that the num sleeping is 0, else we wouldn't be here)
-                    if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) &&
-                        (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) {
+            }
+            // If no object was allocated
+            if (null == latch._pair) {
+                // Check to see if we were allowed to create one
+                if (latch._mayCreate) {
+                    // allow new object to be created
+                } else {
+                    // the pool is exhausted
+                    switch(_whenExhaustedAction) {
+                        case WHEN_EXHAUSTED_GROW:
                         // allow new object to be created
-                    } else {
-                        // the pool is exhausted
-                        switch(_whenExhaustedAction) {
-                            case WHEN_EXHAUSTED_GROW:
-                                // allow new object to be created
-                                break;
-                            case WHEN_EXHAUSTED_FAIL:
-                                throw new NoSuchElementException("Pool exhausted");
-                            case WHEN_EXHAUSTED_BLOCK:
-                                try {
+                        break;
+                        case WHEN_EXHAUSTED_FAIL:
+                            synchronized (this) {
+                                _allocationQueue.remove(latch);
+                            }
+                            throw new NoSuchElementException("Pool exhausted");
+                        case WHEN_EXHAUSTED_BLOCK:
+                            try {
+                                synchronized (latch) {
                                     if(_maxWait <= 0) {
-                                        wait();
+                                        latch.wait();
                                     } else {
                                         // this code may be executed again after a notify then continue cycle
                                         // so, need to calculate the amount of time to wait
@@ -996,38 +983,38 @@
                                         final long waitTime = _maxWait - elapsed;
                                         if (waitTime > 0)
                                         {
-                                            wait(waitTime);
+                                            latch.wait(waitTime);
                                         }
                                     }
-                                } catch(InterruptedException e) {
-                                    Thread.currentThread().interrupt();
-                                    throw e;
                                 }
-                                if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
-                                    throw new NoSuchElementException("Timeout waiting for idle object");
-                                } else {
-                                    continue; // keep looping
+                            } catch(InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw e;
                                 }
-                            default:
-                                throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized.");
-                        }
+                            if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
+                                throw new NoSuchElementException("Timeout waiting for idle object");
+                            } else {
+                                continue; // keep looping
+                            }
+                        default:
+                            throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized.");
                     }
                 }
-                pool.incrementActiveCount();
             }
 
             boolean newlyCreated = false;
-            if (null == pair) {
+            if (null == latch._pair) {
                 try {
                     Object obj = _factory.makeObject(key);
-                    pair = new ObjectTimestampPair(obj);
+                    latch._pair = new ObjectTimestampPair(obj);
                     newlyCreated = true;
                 } finally {
                     if (!newlyCreated) {
                         // object cannot be created
                         synchronized (this) {
-                            pool.decrementActiveCount();
-                            notifyAll();
+                            latch._pool.decrementInternalProcessingCount();
+                            // No need to reset latch - about to throw exception
+                            allocate();
                         }
                     }
                 }
@@ -1035,21 +1022,27 @@
 
             // activate & validate the object
             try {
-                _factory.activateObject(key, pair.value);
-                if (_testOnBorrow && !_factory.validateObject(key, pair.value)) {
+                _factory.activateObject(key, latch._pair.value);
+                if (_testOnBorrow && !_factory.validateObject(key, latch._pair.value)) {
                     throw new Exception("ValidateObject failed");
                 }
-                return pair.value;
+                synchronized (this) {
+                    latch._pool.decrementInternalProcessingCount();
+                    latch._pool.incrementActiveCount();
+                }
+                return latch._pair.value;
             } catch (Throwable e) {
                 // object cannot be activated or is invalid
                 try {
-                    _factory.destroyObject(key,pair.value);
+                    _factory.destroyObject(key, latch._pair.value);
                 } catch (Throwable e2) {
                     // cannot destroy broken object
                 }
                 synchronized (this) {
-                    pool.decrementActiveCount();
-                    notifyAll();
+                    latch._pool.decrementInternalProcessingCount();
+                    latch.reset();
+                    _allocationQueue.add(0, latch);
+                    allocate();
                 }
                 if(newlyCreated) {
                     throw new NoSuchElementException(
@@ -1063,6 +1056,58 @@
         }
     }
 
+    private synchronized void allocate() {
+        if (isClosed()) return;
+
+        for (;;) {
+            if (!_allocationQueue.isEmpty()) {
+                // First use any objects in the pool to clear the queue
+                Latch latch = (Latch) _allocationQueue.getFirst();
+                ObjectQueue pool = (ObjectQueue)(_poolMap.get(latch._key));
+                if(null == pool) {
+                    pool = new ObjectQueue();
+                    _poolMap.put(latch._key, pool);
+                    _poolList.add(latch._key);
+                }
+                latch._pool = pool;
+                if (!pool.queue.isEmpty()) {
+                    _allocationQueue.removeFirst();
+                    latch._pair = (ObjectTimestampPair) pool.queue.removeFirst();
+                    pool.incrementInternalProcessingCount();
+                    _totalIdle--;
+                    synchronized (latch) {
+                        latch.notify();
+                    }
+                    // Next item in queue
+                    continue;
+                }
+                
+                // If there is a totalMaxActive and we are at the limit then
+                // we have to make room
+                if ((_maxTotal > 0)
+                        && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) {
+                    clearOldest();
+                }
+
+
+                // Second utilise any spare capacity to create new objects
+                if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) &&
+                        (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) {
+                    // allow new object to be created
+                    _allocationQueue.removeFirst();
+                    latch._mayCreate = true;
+                    pool.incrementInternalProcessingCount();
+                    synchronized (latch) {
+                        latch.notify();
+                    }
+                    // Next item in queue
+                    continue;
+                }
+            }
+            break;
+        }
+    }
+
     /**
      * Clears the pool, removing all pooled instances.
      */
@@ -1184,7 +1229,7 @@
                 } finally {
                     synchronized(this) {
                         _totalInternalProcessing--;
-                        notifyAll();
+                        allocate();
                     }
                 }
             }
@@ -1250,7 +1295,7 @@
                 if (pool != null) {
                     synchronized(this) {
                         pool.decrementActiveCount();
-                        notifyAll();
+                        allocate();
                     }  
                 }
             }
@@ -1301,7 +1346,7 @@
                     if (decrementNumActive) {
                         pool.decrementActiveCount();
                     }
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1317,7 +1362,7 @@
             if (decrementNumActive) {
                 synchronized(this) {
                     pool.decrementActiveCount();
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1335,7 +1380,7 @@
                     _poolList.add(key);
                 }
                 pool.decrementActiveCount();
-                notifyAll(); // _totalActive has changed
+                allocate(); // _totalActive has changed
             }
         }
     }
@@ -1696,7 +1741,7 @@
             } finally {
                 synchronized (this) {
                     pool.decrementInternalProcessingCount();
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1941,6 +1986,31 @@
         public boolean lifo = GenericKeyedObjectPool.DEFAULT_LIFO;
     }
 
+    /**
+     * Latch used to control allocation order of objects to threads to ensure
+     * fairness. ie objects are allocated to threads in the order that threads
+     * request objects.
+     */
+    private static final class Latch {
+        Object _key;
+        ObjectQueue _pool;
+        ObjectTimestampPair _pair;
+        boolean _mayCreate = false;
+
+        private Latch(Object key) {
+            _key = key;
+        }
+
+        /**
+         * Reset the latch data. Used when an allocation fails and the latch
+         * needs to be re-added to the queue. 
+         */
+        private void reset() {
+            _pair = null;
+            _mayCreate = false;
+        }
+    }
+
     //--- protected attributes ---------------------------------------
 
     /**
@@ -2115,4 +2185,12 @@
     
     /** Whether or not the pools behave as LIFO queues (last in first out) */
     private boolean _lifo = DEFAULT_LIFO;
+    
+    /**
+     * Used to track the order in which threads call {@link #borrowObject()} so
+     * that objects can be allocated in the order in which the threads requested
+     * them.
+     */
+    private LinkedList _allocationQueue = new LinkedList();
+
 }