You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by ma...@apache.org on 2009/05/17 18:42:34 UTC

svn commit: r775707 - /commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java

Author: markt
Date: Sun May 17 16:42:34 2009
New Revision: 775707

URL: http://svn.apache.org/viewvc?rev=775707&view=rev
Log:
Fix POOL-75 for GOP. Ensure objects are allocated in the order in which threads ask for them.

Modified:
    commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java

Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java
URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java?rev=775707&r1=775706&r2=775707&view=diff
==============================================================================
--- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java (original)
+++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java Sun May 17 16:42:34 2009
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.TimerTask;
@@ -539,7 +540,7 @@
      */
     public synchronized void setMaxActive(int maxActive) {
         _maxActive = maxActive;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -570,7 +571,7 @@
             case WHEN_EXHAUSTED_FAIL:
             case WHEN_EXHAUSTED_GROW:
                 _whenExhaustedAction = whenExhaustedAction;
-                notifyAll();
+                allocate();
                 break;
             default:
                 throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized.");
@@ -614,7 +615,7 @@
      */
     public synchronized void setMaxWait(long maxWait) {
         _maxWait = maxWait;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -641,7 +642,7 @@
      */
     public synchronized void setMaxIdle(int maxIdle) {
         _maxIdle = maxIdle;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -658,7 +659,7 @@
      */
     public synchronized void setMinIdle(int minIdle) {
         _minIdle = minIdle;
-        notifyAll();
+        allocate();
     }
 
     /**
@@ -917,43 +918,42 @@
         setTimeBetweenEvictionRunsMillis(conf.timeBetweenEvictionRunsMillis);
         setSoftMinEvictableIdleTimeMillis(conf.softMinEvictableIdleTimeMillis);
         setLifo(conf.lifo);
-        notifyAll();
+        allocate();
     }
 
     //-- ObjectPool methods ------------------------------------------
 
     public Object borrowObject() throws Exception {
         long starttime = System.currentTimeMillis();
+        Latch latch = new Latch();
+        synchronized (this) {
+            _allocationQueue.add(latch);
+            allocate();
+        }
+
         for(;;) {
-            ObjectTimestampPair pair = null;
-            
             synchronized (this) {
                 assertOpen();
-                // if there are any sleeping, just grab one of those
-                try {
-                    pair = (ObjectTimestampPair)(_pool.removeFirst());
-                } catch(NoSuchElementException e) {
-                    /* ignored */
-                }
-    
-                // otherwise
-                if(null == pair) {
-                    // check if we can create one
-                    // (note we know that the num sleeping is 0, else we wouldn't be here)
-                    if(_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive) {
-                        // allow new object to be created
-                    } else {
-                        // the pool is exhausted
-                        switch(_whenExhaustedAction) {
-                            case WHEN_EXHAUSTED_GROW:
-                                // allow new object to be created
-                                break;
-                            case WHEN_EXHAUSTED_FAIL:
-                                throw new NoSuchElementException("Pool exhausted");
-                            case WHEN_EXHAUSTED_BLOCK:
-                                try {
+            }
+                
+            // If no object was allocated from the pool above
+            if(latch._pair == null) {
+                // check if we were allowed to create one
+                if(latch._mayCreate) {
+                    // allow new object to be created
+                } else {
+                    // the pool is exhausted
+                    switch(_whenExhaustedAction) {
+                        case WHEN_EXHAUSTED_GROW:
+                            // allow new object to be created
+                            break;
+                        case WHEN_EXHAUSTED_FAIL:
+                            throw new NoSuchElementException("Pool exhausted");
+                        case WHEN_EXHAUSTED_BLOCK:
+                            try {
+                                synchronized (latch) {
                                     if(_maxWait <= 0) {
-                                        wait();
+                                        latch.wait();
                                     } else {
                                         // this code may be executed again after a notify then continue cycle
                                         // so, need to calculate the amount of time to wait
@@ -961,62 +961,66 @@
                                         final long waitTime = _maxWait - elapsed;
                                         if (waitTime > 0)
                                         {
-                                            wait(waitTime);
+                                            latch.wait(waitTime);
                                         }
                                     }
-                                } catch(InterruptedException e) {
-                                    Thread.currentThread().interrupt();
-                                    throw e; 
-                                }
-                                if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
-                                    throw new NoSuchElementException("Timeout waiting for idle object");
-                                } else {
-                                    continue; // keep looping
                                 }
-                            default:
-                                throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized.");
-                        }
+                            } catch(InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw e; 
+                            }
+                            if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
+                                throw new NoSuchElementException("Timeout waiting for idle object");
+                            } else {
+                                continue; // keep looping
+                            }
+                        default:
+                            throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized.");
                     }
                 }
-                _numActive++;
             }
 
-            // create new object when needed
             boolean newlyCreated = false;
-            if(null == pair) {
+            if(null == latch._pair) {
                 try {
                     Object obj = _factory.makeObject();
-                    pair = new ObjectTimestampPair(obj);
+                    latch._pair = new ObjectTimestampPair(obj);
                     newlyCreated = true;
                 } finally {
                     if (!newlyCreated) {
                         // object cannot be created
                         synchronized (this) {
-                            _numActive--;
-                            notifyAll();
+                            _numInternalProcessing--;
+                            // No need to reset latch - about to throw exception
+                            allocate();
                         }
                     }
                 }
             }
-
             // activate & validate the object
             try {
-                _factory.activateObject(pair.value);
-                if(_testOnBorrow && !_factory.validateObject(pair.value)) {
+                _factory.activateObject(latch._pair.value);
+                if(_testOnBorrow && !_factory.validateObject(latch._pair.value)) {
                     throw new Exception("ValidateObject failed");
                 }
-                return pair.value;
+                synchronized(this) {
+                    _numInternalProcessing--;
+                    _numActive++;
+                }
+                return latch._pair.value;
             }
             catch (Throwable e) {
                 // object cannot be activated or is invalid
                 try {
-                    _factory.destroyObject(pair.value);
+                    _factory.destroyObject(latch._pair.value);
                 } catch (Throwable e2) {
                     // cannot destroy broken object
                 }
                 synchronized (this) {
-                    _numActive--;
-                    notifyAll();
+                    _numInternalProcessing--;
+                    latch.reset();
+                    _allocationQueue.add(0, latch);
+                    allocate();
                 }
                 if(newlyCreated) {
                     throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage());
@@ -1028,6 +1032,37 @@
         }
     }
 
+    private synchronized void allocate() {
+        // First use any objects in the pool to clear the queue
+        for (;;) {
+            if (isClosed()) return;
+            if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
+                Latch latch = (Latch) _allocationQueue.removeFirst();
+                latch._pair = (ObjectTimestampPair) _pool.removeFirst();
+                _numInternalProcessing++;
+                synchronized (latch) {
+                    latch.notify();
+                }
+            } else {
+                break;
+            }
+        }
+        // Second utilise any spare capacity to create new objects
+        for(;;) {
+            if (isClosed()) return;
+            if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {
+                Latch latch = (Latch) _allocationQueue.removeFirst();
+                latch._mayCreate = true;
+                _numInternalProcessing++;
+                synchronized (latch) {
+                    latch.notify();
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
     public void invalidateObject(Object obj) throws Exception {
         try {
             if (_factory != null) {
@@ -1036,7 +1071,7 @@
         } finally {
             synchronized (this) {
                 _numActive--;
-                notifyAll(); // _numActive has changed
+                allocate();
             }
         }
     }
@@ -1068,7 +1103,7 @@
             } finally {
                 synchronized(this) {
                     _numInternalProcessing--;
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1117,7 +1152,7 @@
                 // "behavior flag",decrementNumActive, from addObjectToPool.
                 synchronized(this) {
                     _numActive--;
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1152,7 +1187,7 @@
                     if (decrementNumActive) {
                         _numActive--;
                     }
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1168,7 +1203,7 @@
             if (decrementNumActive) {
                 synchronized(this) {
                     _numActive--;
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1319,7 +1354,7 @@
             } finally {
                 synchronized (this) {
                     _numInternalProcessing--;
-                    notifyAll();
+                    allocate();
                 }
             }
         }
@@ -1490,6 +1525,26 @@
     
     }
 
+    /**
+     * Latch used to control allocation order of objects to threads to ensure
+     * fairness. ie objects are allocated to threads in the order that threads
+     * request objects.
+     */
+    private static final class Latch {
+        ObjectTimestampPair _pair;
+        boolean _mayCreate = false;
+        
+        /**
+         * Reset the latch data. Used when an allocation fails and the latch
+         * needs to be re-added to the queue. 
+         */
+        private void reset() {
+            _pair = null;
+            _mayCreate = false;
+        }
+    }
+
+
     //--- private attributes ---------------------------------------
 
     /**
@@ -1664,4 +1719,12 @@
      * number of objects but are neither active nor idle.
      */
     private int _numInternalProcessing = 0;
+
+    /**
+     * Used to track the order in which threads call {@link #borrowObject()} so
+     * that objects can be allocated in the order in which the threads requested
+     * them.
+     */
+    private LinkedList _allocationQueue = new LinkedList();
+
 }