You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by ma...@apache.org on 2009/05/17 18:42:34 UTC
svn commit: r775707 -
/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java
Author: markt
Date: Sun May 17 16:42:34 2009
New Revision: 775707
URL: http://svn.apache.org/viewvc?rev=775707&view=rev
Log:
Fix POOL-75 for GOP. Ensure objects are allocated in the order in which threads ask for them.
Modified:
commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java
Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java
URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java?rev=775707&r1=775706&r2=775707&view=diff
==============================================================================
--- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java (original)
+++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java Sun May 17 16:42:34 2009
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TimerTask;
@@ -539,7 +540,7 @@
*/
public synchronized void setMaxActive(int maxActive) {
_maxActive = maxActive;
- notifyAll();
+ allocate();
}
/**
@@ -570,7 +571,7 @@
case WHEN_EXHAUSTED_FAIL:
case WHEN_EXHAUSTED_GROW:
_whenExhaustedAction = whenExhaustedAction;
- notifyAll();
+ allocate();
break;
default:
throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized.");
@@ -614,7 +615,7 @@
*/
public synchronized void setMaxWait(long maxWait) {
_maxWait = maxWait;
- notifyAll();
+ allocate();
}
/**
@@ -641,7 +642,7 @@
*/
public synchronized void setMaxIdle(int maxIdle) {
_maxIdle = maxIdle;
- notifyAll();
+ allocate();
}
/**
@@ -658,7 +659,7 @@
*/
public synchronized void setMinIdle(int minIdle) {
_minIdle = minIdle;
- notifyAll();
+ allocate();
}
/**
@@ -917,43 +918,42 @@
setTimeBetweenEvictionRunsMillis(conf.timeBetweenEvictionRunsMillis);
setSoftMinEvictableIdleTimeMillis(conf.softMinEvictableIdleTimeMillis);
setLifo(conf.lifo);
- notifyAll();
+ allocate();
}
//-- ObjectPool methods ------------------------------------------
public Object borrowObject() throws Exception {
long starttime = System.currentTimeMillis();
+ Latch latch = new Latch();
+ synchronized (this) {
+ _allocationQueue.add(latch);
+ allocate();
+ }
+
for(;;) {
- ObjectTimestampPair pair = null;
-
synchronized (this) {
assertOpen();
- // if there are any sleeping, just grab one of those
- try {
- pair = (ObjectTimestampPair)(_pool.removeFirst());
- } catch(NoSuchElementException e) {
- /* ignored */
- }
-
- // otherwise
- if(null == pair) {
- // check if we can create one
- // (note we know that the num sleeping is 0, else we wouldn't be here)
- if(_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive) {
- // allow new object to be created
- } else {
- // the pool is exhausted
- switch(_whenExhaustedAction) {
- case WHEN_EXHAUSTED_GROW:
- // allow new object to be created
- break;
- case WHEN_EXHAUSTED_FAIL:
- throw new NoSuchElementException("Pool exhausted");
- case WHEN_EXHAUSTED_BLOCK:
- try {
+ }
+
+ // If no object was allocated from the pool above
+ if(latch._pair == null) {
+ // check if we were allowed to create one
+ if(latch._mayCreate) {
+ // allow new object to be created
+ } else {
+ // the pool is exhausted
+ switch(_whenExhaustedAction) {
+ case WHEN_EXHAUSTED_GROW:
+ // allow new object to be created
+ break;
+ case WHEN_EXHAUSTED_FAIL:
+ throw new NoSuchElementException("Pool exhausted");
+ case WHEN_EXHAUSTED_BLOCK:
+ try {
+ synchronized (latch) {
if(_maxWait <= 0) {
- wait();
+ latch.wait();
} else {
// this code may be executed again after a notify then continue cycle
// so, need to calculate the amount of time to wait
@@ -961,62 +961,66 @@
final long waitTime = _maxWait - elapsed;
if (waitTime > 0)
{
- wait(waitTime);
+ latch.wait(waitTime);
}
}
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- throw e;
- }
- if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
- throw new NoSuchElementException("Timeout waiting for idle object");
- } else {
- continue; // keep looping
}
- default:
- throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized.");
- }
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
+ throw new NoSuchElementException("Timeout waiting for idle object");
+ } else {
+ continue; // keep looping
+ }
+ default:
+ throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized.");
}
}
- _numActive++;
}
- // create new object when needed
boolean newlyCreated = false;
- if(null == pair) {
+ if(null == latch._pair) {
try {
Object obj = _factory.makeObject();
- pair = new ObjectTimestampPair(obj);
+ latch._pair = new ObjectTimestampPair(obj);
newlyCreated = true;
} finally {
if (!newlyCreated) {
// object cannot be created
synchronized (this) {
- _numActive--;
- notifyAll();
+ _numInternalProcessing--;
+ // No need to reset latch - about to throw exception
+ allocate();
}
}
}
}
-
// activate & validate the object
try {
- _factory.activateObject(pair.value);
- if(_testOnBorrow && !_factory.validateObject(pair.value)) {
+ _factory.activateObject(latch._pair.value);
+ if(_testOnBorrow && !_factory.validateObject(latch._pair.value)) {
throw new Exception("ValidateObject failed");
}
- return pair.value;
+ synchronized(this) {
+ _numInternalProcessing--;
+ _numActive++;
+ }
+ return latch._pair.value;
}
catch (Throwable e) {
// object cannot be activated or is invalid
try {
- _factory.destroyObject(pair.value);
+ _factory.destroyObject(latch._pair.value);
} catch (Throwable e2) {
// cannot destroy broken object
}
synchronized (this) {
- _numActive--;
- notifyAll();
+ _numInternalProcessing--;
+ latch.reset();
+ _allocationQueue.add(0, latch);
+ allocate();
}
if(newlyCreated) {
throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage());
@@ -1028,6 +1032,37 @@
}
}
+ private synchronized void allocate() {
+ // First use any objects in the pool to clear the queue
+ for (;;) {
+ if (isClosed()) return;
+ if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
+ Latch latch = (Latch) _allocationQueue.removeFirst();
+ latch._pair = (ObjectTimestampPair) _pool.removeFirst();
+ _numInternalProcessing++;
+ synchronized (latch) {
+ latch.notify();
+ }
+ } else {
+ break;
+ }
+ }
+ // Second utilise any spare capacity to create new objects
+ for(;;) {
+ if (isClosed()) return;
+ if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {
+ Latch latch = (Latch) _allocationQueue.removeFirst();
+ latch._mayCreate = true;
+ _numInternalProcessing++;
+ synchronized (latch) {
+ latch.notify();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
public void invalidateObject(Object obj) throws Exception {
try {
if (_factory != null) {
@@ -1036,7 +1071,7 @@
} finally {
synchronized (this) {
_numActive--;
- notifyAll(); // _numActive has changed
+ allocate();
}
}
}
@@ -1068,7 +1103,7 @@
} finally {
synchronized(this) {
_numInternalProcessing--;
- notifyAll();
+ allocate();
}
}
}
@@ -1117,7 +1152,7 @@
// "behavior flag",decrementNumActive, from addObjectToPool.
synchronized(this) {
_numActive--;
- notifyAll();
+ allocate();
}
}
}
@@ -1152,7 +1187,7 @@
if (decrementNumActive) {
_numActive--;
}
- notifyAll();
+ allocate();
}
}
}
@@ -1168,7 +1203,7 @@
if (decrementNumActive) {
synchronized(this) {
_numActive--;
- notifyAll();
+ allocate();
}
}
}
@@ -1319,7 +1354,7 @@
} finally {
synchronized (this) {
_numInternalProcessing--;
- notifyAll();
+ allocate();
}
}
}
@@ -1490,6 +1525,26 @@
}
+ /**
+ * Latch used to control allocation order of objects to threads to ensure
+ * fairness. ie objects are allocated to threads in the order that threads
+ * request objects.
+ */
+ private static final class Latch {
+ ObjectTimestampPair _pair;
+ boolean _mayCreate = false;
+
+ /**
+ * Reset the latch data. Used when an allocation fails and the latch
+ * needs to be re-added to the queue.
+ */
+ private void reset() {
+ _pair = null;
+ _mayCreate = false;
+ }
+ }
+
+
//--- private attributes ---------------------------------------
/**
@@ -1664,4 +1719,12 @@
* number of objects but are neither active nor idle.
*/
private int _numInternalProcessing = 0;
+
+ /**
+ * Used to track the order in which threads call {@link #borrowObject()} so
+ * that objects can be allocated in the order in which the threads requested
+ * them.
+ */
+ private LinkedList _allocationQueue = new LinkedList();
+
}