You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/27 06:00:30 UTC
svn commit: r1176188 -
/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
Author: patanachai
Date: Tue Sep 27 04:00:30 2011
New Revision: 1176188
URL: http://svn.apache.org/viewvc?rev=1176188&view=rev
Log:
AIRAVATA-101 revise concurrency in ConnectionPool
Modified:
incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1176188&r1=1176187&r2=1176188&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Tue Sep 27 04:00:30 2011
@@ -27,7 +27,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Stack;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Semaphore;
import javax.sql.DataSource;
@@ -53,7 +53,7 @@ public class ConnectionPool {
private boolean autoCommit = true;
private boolean waitIfBusy;
- private AtomicInteger needConnection = new AtomicInteger(0);
+ private Semaphore needConnection = new Semaphore(0);
private boolean stop;
private Stack<Connection> availableConnections;
@@ -143,12 +143,10 @@ public class ConnectionPool {
// If connection on available list is closed (e.g.,
// it timed out), then remove it from available list
- // and repeat the process of obtaining a connection.
- // Also wake up threads that were waiting for a
- // connection because maxConnection limit was reached.
+ // and race for a connection again.
if (existingConnection.isClosed()) {
lastAccessTimeRecord.remove(existingConnection);
- // Freed up a spot for anybody waiting and race for it again
+ // notifyAll for fairness
notifyAll();
} else {
busyConnections.push(existingConnection);
@@ -160,13 +158,15 @@ public class ConnectionPool {
// Throw SQLException in such a case.
throw new SQLException("Connection limit reached");
} else {
- // request connection creation and then wait
- synchronized (needConnection) {
- needConnection.incrementAndGet();
- needConnection.notifyAll();
+
+ if (busyConnections.size() < maxConnections) {
+ // available connection is empty, but total number of connection
+ // doesn't reach maxConnection. Request for more connection
+ needConnection.release();
}
try {
+ // wait for free connection
wait();
} catch (InterruptedException ie) {
}
@@ -201,15 +201,12 @@ public class ConnectionPool {
}
}
- private synchronized void fillUpConnection() throws SQLException {
- synchronized (needConnection) {
- if (needConnection.get() > 0) {
- Connection connection = makeNewConnection();
- availableConnections.push(connection);
- setTimeStamp(connection);
- needConnection.decrementAndGet();
- }
- }
+ private synchronized void fillUpConnection(Connection conn){
+ setTimeStamp(conn);
+ availableConnections.push(conn);
+
+ // notify all since new connection is created
+ notifyAll();
}
private void setTimeStamp(Connection connection) {
@@ -265,7 +262,7 @@ public class ConnectionPool {
try {
busyConnection.close();
iter.remove();
- logger.warn("****Connection has checked out too long. Forced release. Check the program for unReleased connection.");
+ logger.warn("****Connection has checked out too long. Forced release. Check the program for calling release connection [free(Connection) method]");
} catch (SQLException sql) {
logger.error(sql.getMessage(), sql);
}
@@ -301,11 +298,11 @@ public class ConnectionPool {
closeConnections(busyConnections);
busyConnections = new Stack<Connection>();
lastAccessTimeRecord.clear();
-
- try{
+
+ try {
this.clenupThread.join();
this.producerThread.join();
- }catch(Exception e){
+ } catch (Exception e) {
logger.error("Cannot shutdown cleanup thread", e);
}
}
@@ -347,19 +344,19 @@ public class ConnectionPool {
public void run() {
while (!stop) {
try {
- synchronized (needConnection) {
- needConnection.wait();
- }
+ //block until get
+ needConnection.acquire();
+
+ Connection conn = makeNewConnection();
+ fillUpConnection(conn);
+ } catch (SQLException e) {
+ //cannot create connection (increase semaphore value back)
+ needConnection.release();
+ logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
logger.info("Fill up thread is interrupted to close");
break;
}
-
- try {
- fillUpConnection();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
}
}
}