You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by ma...@apache.org on 2009/05/18 00:58:26 UTC
svn commit: r775775 -
/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java
Author: markt
Date: Sun May 17 22:58:25 2009
New Revision: 775775
URL: http://svn.apache.org/viewvc?rev=775775&view=rev
Log:
Fix POOL-75 for GKOP. Objects are now allocated to threads in the order in which the threads made their request.
Modified:
commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java
Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java
URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java?rev=775775&r1=775774&r2=775775&view=diff
==============================================================================
--- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java (original)
+++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java Sun May 17 22:58:25 2009
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -562,7 +563,7 @@
*/
public synchronized void setMaxActive(int maxActive) {
_maxActive = maxActive;
- notifyAll();
+ allocate();
}
/**
@@ -589,7 +590,7 @@
*/
public synchronized void setMaxTotal(int maxTotal) {
_maxTotal = maxTotal;
- notifyAll();
+ allocate();
}
/**
@@ -621,7 +622,7 @@
case WHEN_EXHAUSTED_FAIL:
case WHEN_EXHAUSTED_GROW:
_whenExhaustedAction = whenExhaustedAction;
- notifyAll();
+ allocate();
break;
default:
throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized.");
@@ -693,7 +694,7 @@
*/
public synchronized void setMaxIdle(int maxIdle) {
_maxIdle = maxIdle;
- notifyAll();
+ allocate();
}
/**
@@ -944,51 +945,37 @@
public Object borrowObject(Object key) throws Exception {
long starttime = System.currentTimeMillis();
+ Latch latch = new Latch(key);
+ synchronized (this) {
+ _allocationQueue.add(latch);
+ allocate();
+ }
+
for(;;) {
- ObjectTimestampPair pair = null;
- ObjectQueue pool = null;
synchronized (this) {
assertOpen();
- pool = (ObjectQueue)(_poolMap.get(key));
- if(null == pool) {
- pool = new ObjectQueue();
- _poolMap.put(key,pool);
- _poolList.add(key);
- }
- // if there are any sleeping, just grab one of those
- try {
- pair = (ObjectTimestampPair)(pool.queue.removeFirst());
- if(null != pair) {
- _totalIdle--;
- }
- } catch(NoSuchElementException e) { /* ignored */
- }
- // otherwise
- if(null == pair) {
- // if there is a totalMaxActive and we are at the limit then
- // we have to make room
- if ((_maxTotal > 0)
- && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) {
- clearOldest();
- }
-
- // check if we can create one
- // (note we know that the num sleeping is 0, else we wouldn't be here)
- if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) &&
- (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) {
+ }
+ // If no object was allocated
+ if (null == latch._pair) {
+ // Check to see if we were allowed to create one
+ if (latch._mayCreate) {
+ // allow new object to be created
+ } else {
+ // the pool is exhausted
+ switch(_whenExhaustedAction) {
+ case WHEN_EXHAUSTED_GROW:
// allow new object to be created
- } else {
- // the pool is exhausted
- switch(_whenExhaustedAction) {
- case WHEN_EXHAUSTED_GROW:
- // allow new object to be created
- break;
- case WHEN_EXHAUSTED_FAIL:
- throw new NoSuchElementException("Pool exhausted");
- case WHEN_EXHAUSTED_BLOCK:
- try {
+ break;
+ case WHEN_EXHAUSTED_FAIL:
+ synchronized (this) {
+ _allocationQueue.remove(latch);
+ }
+ throw new NoSuchElementException("Pool exhausted");
+ case WHEN_EXHAUSTED_BLOCK:
+ try {
+ synchronized (latch) {
if(_maxWait <= 0) {
- wait();
+ latch.wait();
} else {
// this code may be executed again after a notify then continue cycle
// so, need to calculate the amount of time to wait
@@ -996,38 +983,38 @@
final long waitTime = _maxWait - elapsed;
if (waitTime > 0)
{
- wait(waitTime);
+ latch.wait(waitTime);
}
}
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- throw e;
}
- if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
- throw new NoSuchElementException("Timeout waiting for idle object");
- } else {
- continue; // keep looping
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
}
- default:
- throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized.");
- }
+ if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) {
+ throw new NoSuchElementException("Timeout waiting for idle object");
+ } else {
+ continue; // keep looping
+ }
+ default:
+ throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized.");
}
}
- pool.incrementActiveCount();
}
boolean newlyCreated = false;
- if (null == pair) {
+ if (null == latch._pair) {
try {
Object obj = _factory.makeObject(key);
- pair = new ObjectTimestampPair(obj);
+ latch._pair = new ObjectTimestampPair(obj);
newlyCreated = true;
} finally {
if (!newlyCreated) {
// object cannot be created
synchronized (this) {
- pool.decrementActiveCount();
- notifyAll();
+ latch._pool.decrementInternalProcessingCount();
+ // No need to reset latch - about to throw exception
+ allocate();
}
}
}
@@ -1035,21 +1022,27 @@
// activate & validate the object
try {
- _factory.activateObject(key, pair.value);
- if (_testOnBorrow && !_factory.validateObject(key, pair.value)) {
+ _factory.activateObject(key, latch._pair.value);
+ if (_testOnBorrow && !_factory.validateObject(key, latch._pair.value)) {
throw new Exception("ValidateObject failed");
}
- return pair.value;
+ synchronized (this) {
+ latch._pool.decrementInternalProcessingCount();
+ latch._pool.incrementActiveCount();
+ }
+ return latch._pair.value;
} catch (Throwable e) {
// object cannot be activated or is invalid
try {
- _factory.destroyObject(key,pair.value);
+ _factory.destroyObject(key, latch._pair.value);
} catch (Throwable e2) {
// cannot destroy broken object
}
synchronized (this) {
- pool.decrementActiveCount();
- notifyAll();
+ latch._pool.decrementInternalProcessingCount();
+ latch.reset();
+ _allocationQueue.add(0, latch);
+ allocate();
}
if(newlyCreated) {
throw new NoSuchElementException(
@@ -1063,6 +1056,58 @@
}
}
+ private synchronized void allocate() {
+ if (isClosed()) return;
+
+ for (;;) {
+ if (!_allocationQueue.isEmpty()) {
+ // First use any objects in the pool to clear the queue
+ Latch latch = (Latch) _allocationQueue.getFirst();
+ ObjectQueue pool = (ObjectQueue)(_poolMap.get(latch._key));
+ if(null == pool) {
+ pool = new ObjectQueue();
+ _poolMap.put(latch._key, pool);
+ _poolList.add(latch._key);
+ }
+ latch._pool = pool;
+ if (!pool.queue.isEmpty()) {
+ _allocationQueue.removeFirst();
+ latch._pair = (ObjectTimestampPair) pool.queue.removeFirst();
+ pool.incrementInternalProcessingCount();
+ _totalIdle--;
+ synchronized (latch) {
+ latch.notify();
+ }
+ // Next item in queue
+ continue;
+ }
+
+ // If there is a totalMaxActive and we are at the limit then
+ // we have to make room
+ if ((_maxTotal > 0)
+ && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) {
+ clearOldest();
+ }
+
+
+ // Second utilise any spare capacity to create new objects
+ if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) &&
+ (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) {
+ // allow new object to be created
+ _allocationQueue.removeFirst();
+ latch._mayCreate = true;
+ pool.incrementInternalProcessingCount();
+ synchronized (latch) {
+ latch.notify();
+ }
+ // Next item in queue
+ continue;
+ }
+ }
+ break;
+ }
+ }
+
/**
* Clears the pool, removing all pooled instances.
*/
@@ -1184,7 +1229,7 @@
} finally {
synchronized(this) {
_totalInternalProcessing--;
- notifyAll();
+ allocate();
}
}
}
@@ -1250,7 +1295,7 @@
if (pool != null) {
synchronized(this) {
pool.decrementActiveCount();
- notifyAll();
+ allocate();
}
}
}
@@ -1301,7 +1346,7 @@
if (decrementNumActive) {
pool.decrementActiveCount();
}
- notifyAll();
+ allocate();
}
}
}
@@ -1317,7 +1362,7 @@
if (decrementNumActive) {
synchronized(this) {
pool.decrementActiveCount();
- notifyAll();
+ allocate();
}
}
}
@@ -1335,7 +1380,7 @@
_poolList.add(key);
}
pool.decrementActiveCount();
- notifyAll(); // _totalActive has changed
+ allocate(); // _totalActive has changed
}
}
}
@@ -1696,7 +1741,7 @@
} finally {
synchronized (this) {
pool.decrementInternalProcessingCount();
- notifyAll();
+ allocate();
}
}
}
@@ -1941,6 +1986,31 @@
public boolean lifo = GenericKeyedObjectPool.DEFAULT_LIFO;
}
+ /**
+ * Latch used to control allocation order of objects to threads to ensure
+ * fairness. ie objects are allocated to threads in the order that threads
+ * request objects.
+ */
+ private static final class Latch {
+ Object _key;
+ ObjectQueue _pool;
+ ObjectTimestampPair _pair;
+ boolean _mayCreate = false;
+
+ private Latch(Object key) {
+ _key = key;
+ }
+
+ /**
+ * Reset the latch data. Used when an allocation fails and the latch
+ * needs to be re-added to the queue.
+ */
+ private void reset() {
+ _pair = null;
+ _mayCreate = false;
+ }
+ }
+
//--- protected attributes ---------------------------------------
/**
@@ -2115,4 +2185,12 @@
/** Whether or not the pools behave as LIFO queues (last in first out) */
private boolean _lifo = DEFAULT_LIFO;
+
+ /**
+ * Used to track the order in which threads call {@link #borrowObject()} so
+ * that objects can be allocated in the order in which the threads requested
+ * them.
+ */
+ private LinkedList _allocationQueue = new LinkedList();
+
}