You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/27 06:00:30 UTC

svn commit: r1176188 - /incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java

Author: patanachai
Date: Tue Sep 27 04:00:30 2011
New Revision: 1176188

URL: http://svn.apache.org/viewvc?rev=1176188&view=rev
Log:
AIRAVATA-101 revise concurrency in ConnectionPool

Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1176188&r1=1176187&r2=1176188&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Tue Sep 27 04:00:30 2011
@@ -27,7 +27,7 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Stack;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Semaphore;
 
 import javax.sql.DataSource;
 
@@ -53,7 +53,7 @@ public class ConnectionPool {
     private boolean autoCommit = true;
     private boolean waitIfBusy;
 
-    private AtomicInteger needConnection = new AtomicInteger(0);
+    private Semaphore needConnection = new Semaphore(0);
     private boolean stop;
 
     private Stack<Connection> availableConnections;
@@ -143,12 +143,10 @@ public class ConnectionPool {
 
             // If connection on available list is closed (e.g.,
             // it timed out), then remove it from available list
-            // and repeat the process of obtaining a connection.
-            // Also wake up threads that were waiting for a
-            // connection because maxConnection limit was reached.
+            // and race for a connection again.
             if (existingConnection.isClosed()) {
                 lastAccessTimeRecord.remove(existingConnection);
-                // Freed up a spot for anybody waiting and race for it again
+                // notifyAll for fairness
                 notifyAll();
             } else {
                 busyConnections.push(existingConnection);
@@ -160,13 +158,15 @@ public class ConnectionPool {
             // Throw SQLException in such a case.
             throw new SQLException("Connection limit reached");
         } else {
-            // request connection creation and then wait
-            synchronized (needConnection) {
-                needConnection.incrementAndGet();
-                needConnection.notifyAll();
+
+            if (busyConnections.size() < maxConnections) {
+                // available connection is empty, but total number of connection
+                // doesn't reach maxConnection. Request for more connection
+                needConnection.release();
             }
 
             try {
+                // wait for free connection
                 wait();
             } catch (InterruptedException ie) {
             }
@@ -201,15 +201,12 @@ public class ConnectionPool {
         }
     }
 
-    private synchronized void fillUpConnection() throws SQLException {
-        synchronized (needConnection) {
-            if (needConnection.get() > 0) {
-                Connection connection = makeNewConnection();
-                availableConnections.push(connection);
-                setTimeStamp(connection);
-                needConnection.decrementAndGet();
-            }
-        }
+    private synchronized void fillUpConnection(Connection conn){
+        setTimeStamp(conn);
+        availableConnections.push(conn);
+
+        // notify all since new connection is created
+        notifyAll();
     }
 
     private void setTimeStamp(Connection connection) {
@@ -265,7 +262,7 @@ public class ConnectionPool {
                 try {
                     busyConnection.close();
                     iter.remove();
-                    logger.warn("****Connection has checked out too long. Forced release. Check the program for unReleased connection.");
+                    logger.warn("****Connection has checked out too long. Forced release. Check the program for calling release connection [free(Connection) method]");
                 } catch (SQLException sql) {
                     logger.error(sql.getMessage(), sql);
                 }
@@ -301,11 +298,11 @@ public class ConnectionPool {
         closeConnections(busyConnections);
         busyConnections = new Stack<Connection>();
         lastAccessTimeRecord.clear();
-        
-        try{
+
+        try {
             this.clenupThread.join();
             this.producerThread.join();
-        }catch(Exception e){
+        } catch (Exception e) {
             logger.error("Cannot shutdown cleanup thread", e);
         }
     }
@@ -347,19 +344,19 @@ public class ConnectionPool {
         public void run() {
             while (!stop) {
                 try {
-                    synchronized (needConnection) {
-                        needConnection.wait();
-                    }
+                    //block until get
+                    needConnection.acquire();
+                    
+                    Connection conn = makeNewConnection();                    
+                    fillUpConnection(conn);
+                } catch (SQLException e) {
+                    //cannot create connection (increase semaphore value back)
+                    needConnection.release();
+                    logger.error(e.getMessage(), e);
                 } catch (InterruptedException e) {
                     logger.info("Fill up thread is interrupted to close");
                     break;
                 }
-
-                try {
-                    fillUpConnection();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
             }
         }
     }