You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/20 01:13:34 UTC

svn commit: r1172890 - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/ messagebox/src/main/org/apache/airavata/wsmg/msgbox/S...

Author: patanachai
Date: Mon Sep 19 23:13:33 2011
New Revision: 1172890

URL: http://svn.apache.org/viewvc?rev=1172890&view=rev
Log:
AIRAVATA-82 fix synchronization and always release table lock

Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Mon Sep 19 23:13:33 2011
@@ -105,6 +105,15 @@ public class ConnectionPool {
         initialize(initialConnections, maxConnections, waitIfBusy);
     }
 
+    /**
+     * Check if this connection pool is auto commit or not
+     * 
+     * @return
+     */
+    public boolean isAutoCommit() {
+        return this.autoCommit;
+    }
+
     private void initialize(int initialConnections, int maxConnections, boolean waitIfBusy) throws SQLException {
         this.maxConnections = maxConnections;
         this.waitIfBusy = waitIfBusy;

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java Mon Sep 19 23:13:33 2011
@@ -32,10 +32,6 @@ import org.slf4j.LoggerFactory;
 public class JdbcStorage {
     private static Logger log = LoggerFactory.getLogger(JdbcStorage.class);
 
-    private PreparedStatement stmt = null;
-
-    private ResultSet rs = null;
-
     private ConnectionPool connectionPool;
 
     public JdbcStorage(String jdbcUrl, String jdbcDriver) {
@@ -50,12 +46,55 @@ public class JdbcStorage {
                         Connection.TRANSACTION_SERIALIZABLE);
             } else {
                 connectionPool = new ConnectionPool(driver, url, initCon, maxCon, true);
-            }            
+            }
         } catch (Exception e) {
             throw new RuntimeException("Failed to create database connection pool.", e);
         }
     }
 
+    /**
+     * Check if this connection pool is auto commit or not
+     * 
+     * @return
+     */
+    public boolean isAutoCommit() {
+        return connectionPool.isAutoCommit();
+    }
+
+    public void commit(Connection conn) {
+        try {
+            if (conn != null && !conn.getAutoCommit()) {
+                conn.commit();
+            }
+        } catch (SQLException sqle) {
+            log.error("Cannot commit data", sqle);
+        }
+    }
+
+    public void commitAndFree(Connection conn) {
+        commit(conn);
+        if (conn != null) {
+            closeConnection(conn);
+        }
+    }
+
+    public void rollback(Connection conn) {
+        try {
+            if (conn != null && !conn.getAutoCommit()) {
+                conn.rollback();
+            }
+        } catch (SQLException sqle) {
+            log.error("Cannot Rollback data", sqle);
+        }
+    }
+
+    public void rollbackAndFree(Connection conn) {
+        rollback(conn);
+        if (conn != null) {
+            closeConnection(conn);
+        }
+    }
+
     public Connection connect() {
 
         Connection conn = null;
@@ -75,17 +114,21 @@ public class JdbcStorage {
     public int update(String query) throws SQLException {
         int result = 0;
         Connection conn = null;
+        PreparedStatement stmt = null;
         try {
             conn = connectionPool.getConnection();
             stmt = conn.prepareStatement(query);
             result = stmt.executeUpdate();
+            commit(conn);
+        } catch (SQLException sql) {
+            rollback(conn);
+            throw sql;
         } finally {
-            if (conn != null) {
-                connectionPool.free(conn);
-            }
-
             if (stmt != null && !stmt.isClosed()) {
                 stmt.close();
+            }            
+            if(conn!=null){
+                closeConnection(conn);
             }
         }
         return result;
@@ -111,45 +154,24 @@ public class JdbcStorage {
         return rows;
     }
 
-    public ResultSet query(String query) throws SQLException {
-        ResultSet rs = null;
-        Connection conn = null;
-        try {
-            conn = connectionPool.getConnection();
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            conn.setAutoCommit(false);
-        } finally {
-            if (conn != null) {
-                connectionPool.free(conn);
-            }
-        }
-        return rs;
-    }
-
-    public void close() throws SQLException {
-        if (stmt != null && !stmt.isClosed()) {
-            stmt.close();
-        }
-    }
-
     public int countRow(String tableName, String columnName) throws SQLException {
         String query = new String("SELECT COUNT(" + columnName + ") FROM " + tableName);
         int count = -1;
         Connection conn = null;
+        PreparedStatement stmt = null;
         try {
             conn = connectionPool.getConnection();
             stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
+            ResultSet rs = stmt.executeQuery();
             rs.next();
             count = rs.getInt(1);
         } finally {
-            if (conn != null) {
-                connectionPool.free(conn);
-            }
             if (stmt != null && !stmt.isClosed()) {
                 stmt.close();
             }
+            if (conn != null) {
+                connectionPool.free(conn);
+            }
         }
         return count;
     }
@@ -162,34 +184,19 @@ public class JdbcStorage {
     public int insert(String query) throws SQLException {
         int rows = 0;
         Connection conn = null;
+        PreparedStatement stmt = null;
         try {
             conn = connectionPool.getConnection();
             stmt = conn.prepareStatement(query);
             rows = stmt.executeUpdate();
         } finally {
-            if (conn != null) {
-                connectionPool.free(conn);
-            }
             if (stmt != null && !stmt.isClosed()) {
                 stmt.close();
             }
-        }
-        return rows;
-    }
 
-    /**
-     * This method is provided so that yo can have better control over the
-     * statement. For example: You can use stmt.setString to convert quotation
-     * mark automatically in an INSERT statement
-     * 
-     * NOTE: Statement is closed after execution
-     */
-    public int insertAndClose(PreparedStatement stmt) throws SQLException {
-        int rows = 0;
-        try {
-            rows = stmt.executeUpdate();
-        } finally {
-            stmt.close();
+            if (conn != null) {
+                connectionPool.free(conn);
+            }
         }
         return rows;
     }
@@ -197,5 +204,5 @@ public class JdbcStorage {
     public void closeAllConnections() {
         if (connectionPool != null)
             connectionPool.dispose();
-    }
+    }    
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java Mon Sep 19 23:13:33 2011
@@ -26,19 +26,16 @@ import java.util.List;
 import org.apache.axiom.om.OMElement;
 
 /**
- * Message Box storage backend. This has implemented in two ways inmemory and database.
+ * Message Box storage backend. This has implemented in two ways inmemory and
+ * database.
  */
 public interface MsgBoxStorage {
     public String createMsgBox() throws Exception;
 
-    // DELETE FROM msgbox WHERE msgboxid=key
     public void destroyMsgBox(String key) throws Exception;
 
-    // SELECT * FROM msgbox WHERE msgboxid=key ORDER BY id LIMIT 1
-    // DELETE FROM msgbox WHERE msgboxid=key AND id=*
     public List<String> takeMessagesFromMsgBox(String key) throws Exception;
 
-    // INSERT INTO msgbox ...
     public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
             throws Exception;
 
@@ -46,6 +43,4 @@ public interface MsgBoxStorage {
      * The ancientness is defined in the db.config file.
      */
     public void removeAncientMessages() throws Exception;
-
-    // public void closeConnection() throws Exception;
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java Mon Sep 19 23:13:33 2011
@@ -36,21 +36,15 @@ import org.apache.axiom.om.OMElement;
  * Database message Storage Implementation, if msgBox.properties configured to use database this will set as the storage
  * for MsgBoxSerivceSkeleton
  */
-public class DatabaseStorageImpl implements MsgBoxStorage {
-
-    // private MessageBoxDB messageBoxDB;
-
-    // private final static MLogger logger = MLogger.getLogger();
+public class DatabaseStorageImpl implements MsgBoxStorage {  
 
     public DatabaseStorageImpl(JdbcStorage db, long timeOfOldMessage) throws SQLException {
         MessageBoxDB.initialize(db, timeOfOldMessage);
     }
 
     public String createMsgBox() throws SQLException, IOException {
-        // String uuid = FastUUIDGen.nextUUID();// generate uuid
-
         String uuid = UUID.randomUUID().toString();
-        MessageBoxDB.getInstance().createMsgBx(uuid, "msgBoxes");
+        MessageBoxDB.getInstance().createMsgBx(uuid);
         return uuid;
     }
 
@@ -60,13 +54,9 @@ public class DatabaseStorageImpl impleme
         } catch (SQLException e) {
             throw new Exception("Could not destroy the message box with key " + key, e);
         }
-        // logger.finest("Message box with key " + key +
-        // " was destroyed successfully");
-
     }
 
     public List<String> takeMessagesFromMsgBox(String key) throws Exception {
-        // String[] message;
         List<String> list = null;
 
         try {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java Mon Sep 19 23:13:33 2011
@@ -31,7 +31,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -43,87 +43,116 @@ import javax.xml.stream.XMLStreamExcepti
 import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is the core class which used by DatabaseStorageImpl to perform all the service operations, DatabaseStorageImpl class
- * simply use this class in its operation methods to perform the actual funcationality.
+ * This is the core class which used by DatabaseStorageImpl to perform all the
+ * service operations, DatabaseStorageImpl class simply use this class in its
+ * operation methods to perform the actual funcationality.
  */
 public class MessageBoxDB {
 
-    static Logger logger = Logger.getLogger(MessageBoxDB.class);
+    private static final String MSGBOXES_TABLENAME = "msgBoxes";
+    private static final String MSGBOX_TABLENAME = "msgbox";
+
+    private static final Logger logger = LoggerFactory.getLogger(MessageBoxDB.class);
 
     private static Set<String> msgBoxids;
 
-    public static final String SELECT_ALL_FROM_MSGBOXES = "SELECT * FROM msgBoxes";    
+    public static final String SELECT_ALL_FROM_MSGBOXES = "SELECT * FROM " + MSGBOXES_TABLENAME;
+
+    public static final String SQL_CREATE_MSGBOXES_STATEMENT = "INSERT INTO " + MSGBOXES_TABLENAME
+            + " (msgboxid) VALUES (?)";
 
-    public static String SQL_STORE_MESSAGE_STATEMENT = "INSERT INTO msgbox (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
+    public static final String SQL_DELETE_MSGBOXES_STATEMENT = "DELETE FROM " + MSGBOXES_TABLENAME
+            + " WHERE msgboxid = ?";
 
-    public static String SQL_CREATE_MSGBOX_STATEMENT = "INSERT INTO %s (msgboxid) VALUES ('%s')";
+    public static final String SQL_STORE_MESSAGE_STATEMENT = "INSERT INTO " + MSGBOX_TABLENAME
+            + " (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
 
-    public static String SQL_DELETE_ALL_STATEMENT = "DELETE FROM %s WHERE msgboxid='%s'";
+    public static final String SQL_SELECT_MSGBOX_STATEMENT = "SELECT * FROM " + MSGBOX_TABLENAME
+            + " WHERE msgboxid = ? ORDER BY time ";
 
-    public static String SQL_SELECT_STATEMENT1 = "SELECT * FROM %s WHERE msgboxid='%s' ORDER BY time ";
+    public static final String SQL_DELETE_MSGBOX_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE msgboxid = ?";
+
+    public static final String SQL_DELETE_ANCIENT_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE time < ?";
 
-    public static String SQL_DELETE_ANCIENT_STATEMENT = "DELETE FROM %s WHERE time <'%s'";
-    
     private JdbcStorage db;
-    
+
     private static MessageBoxDB instance;
-    
-    private long time;    
 
-    private MessageBoxDB(JdbcStorage db) {
+    private long time;
+
+    private MessageBoxDB(JdbcStorage db, long time) {
         this.db = db;
+        this.time = time;
     }
-    
-    public static MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException{
-        if(instance == null){
-            instance = new MessageBoxDB(db);
+
+    public static MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException {
+        if (instance == null) {
+            instance = new MessageBoxDB(db, time);
             setMsgBoxidList(db);
         }
         return instance;
     }
-    
-    public static MessageBoxDB getInstance(){
-        if(instance==null){
+
+    public static MessageBoxDB getInstance() {
+        if (instance == null) {
             throw new RuntimeException("Please initialize this object first using initialize(JdbcStorage, long)");
         }
         return instance;
     }
 
-    public void createMsgBx(String messageBoxId, String tableName) throws SQLException, IOException {
+    public void createMsgBx(String messageBoxId) throws SQLException, IOException {
         if (!msgBoxids.contains(messageBoxId)) {
-            Connection connection = db.connect();
-            Statement statement = connection.createStatement();
-            System.out.println(tableName + ":" + messageBoxId);
-            statement.execute(String.format(SQL_CREATE_MSGBOX_STATEMENT, tableName, messageBoxId));
-            connection.commit();
-            db.closeConnection(connection);
-            msgBoxids.add(messageBoxId);
-        } else
+
+            Connection connection = null;
+            try {
+                logger.debug(MSGBOXES_TABLENAME + ":" + messageBoxId);
+
+                connection = db.connect();
+                PreparedStatement statement = connection.prepareStatement(SQL_CREATE_MSGBOXES_STATEMENT);
+                statement.setString(1, messageBoxId);
+                db.executeUpdateAndClose(statement);
+                db.commitAndFree(connection);
+
+                msgBoxids.add(messageBoxId);
+
+            } catch (SQLException sql) {
+                db.rollbackAndFree(connection);
+                throw sql;
+            }
+        } else {
             throw new AxisFault("The message box ID requested already exists");
+        }
     }
 
     public void addMessage(String msgBoxID, String messageID, String soapAction, OMElement message)
             throws SQLException, IOException, XMLStreamException {
         if (msgBoxids.contains(msgBoxID)) {
-            Connection connection = db.connect();
-            PreparedStatement stmt = connection.prepareStatement(SQL_STORE_MESSAGE_STATEMENT);
-            byte[] buffer;
-            ByteArrayOutputStream output = new ByteArrayOutputStream();
-            ObjectOutputStream out = new ObjectOutputStream(output);
-            out.writeObject(message.toStringWithConsume());
-            buffer = output.toByteArray();
-            ByteArrayInputStream in = new ByteArrayInputStream(buffer);
-            stmt.setBinaryStream(1, in, buffer.length);
-            stmt.setString(2, msgBoxID);
-            stmt.setString(3, messageID);
-            stmt.setString(4, soapAction);
-            db.insertAndClose(stmt);
-            stmt.close();
-            connection.commit();
-            db.closeConnection(connection);
+
+            Connection connection = null;
+            try {
+                connection = db.connect();
+                PreparedStatement stmt = connection.prepareStatement(SQL_STORE_MESSAGE_STATEMENT);
+                ByteArrayOutputStream output = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(output);
+                out.writeObject(message.toStringWithConsume());
+                byte[] buffer = output.toByteArray();
+                ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+                stmt.setBinaryStream(1, in, buffer.length);
+                stmt.setString(2, msgBoxID);
+                stmt.setString(3, messageID);
+                stmt.setString(4, soapAction);
+
+                db.executeUpdateAndClose(stmt);
+                db.commitAndFree(connection);
+
+            } catch (SQLException sql) {
+                db.rollbackAndFree(connection);
+                throw sql;
+            }
         } else {
             throw new AxisFault("Currently a messagebox is not available with given message box id :" + msgBoxID);
         }
@@ -132,12 +161,27 @@ public class MessageBoxDB {
     public void deleteMessageBox(String msgBoxId) throws SQLException {
 
         if (msgBoxids.contains(msgBoxId)) {
-            Connection connection = db.connect();
-            Statement statement = connection.createStatement();
-            statement.execute(String.format(SQL_DELETE_ALL_STATEMENT, "msgbox", msgBoxId));
-            statement.execute(String.format(SQL_DELETE_ALL_STATEMENT, "msgBoxes", msgBoxId));
-            db.closeConnection(connection);
-            msgBoxids.remove(msgBoxId);
+
+            Connection connection = null;
+            try {
+                connection = db.connect();
+                PreparedStatement statement = connection.prepareStatement(SQL_DELETE_MSGBOXES_STATEMENT);
+                statement.setString(1, msgBoxId);
+                db.executeUpdateAndClose(statement);
+                statement = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+                statement.setString(1, msgBoxId);
+                db.executeUpdateAndClose(statement);
+
+                // commit
+                db.commitAndFree(connection);
+
+                // remove from set
+                msgBoxids.remove(msgBoxId);
+
+            } catch (SQLException sql) {
+                db.rollbackAndFree(connection);
+                throw sql;
+            }
         }
     }
 
@@ -145,57 +189,97 @@ public class MessageBoxDB {
             ClassNotFoundException, XMLStreamException {
         LinkedList<String> list = new LinkedList<String>();
         if (msgBoxids.contains(msgBoxId)) {
-            Connection connection = db.connect();
-
-            PreparedStatement stmt = connection.prepareStatement(String.format(SQL_SELECT_STATEMENT1, "msgbox",
-                    msgBoxId));
-            ResultSet resultSet = stmt.executeQuery();
-//            resultSet.beforeFirst();
 
-            while (resultSet.next()) {
-                InputStream in = resultSet.getAsciiStream("content");
-                ObjectInputStream s = new ObjectInputStream(in);
-                String xmlString = (String) s.readObject();
-                System.out.println(xmlString);
-                list.addFirst(xmlString);
-            }
-            resultSet.close();
-            stmt.close();
-            stmt = connection.prepareStatement(String.format(SQL_DELETE_ALL_STATEMENT, "msgbox", msgBoxId));
-            db.insertAndClose(stmt);
-            stmt.close();
-            connection.commit();
-            db.closeConnection(connection);
+            Connection connection = null;
+            PreparedStatement stmt = null;
+            try {
+                connection = db.connect();
+                stmt = connection.prepareStatement(SQL_SELECT_MSGBOX_STATEMENT);
+                stmt.setString(1, msgBoxId);
+                ResultSet resultSet = stmt.executeQuery();
+                while (resultSet.next()) {
+                    InputStream in = resultSet.getAsciiStream("content");
+                    ObjectInputStream s = new ObjectInputStream(in);
+                    String xmlString = (String) s.readObject();
+                    logger.debug(xmlString);
+                    list.addFirst(xmlString);
+                }
+                resultSet.close();
+                stmt.close();
+
+                /*
+                 * Delete all retrieved messages
+                 */
+                stmt = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+                stmt.setString(1, msgBoxId);
+                db.executeUpdateAndClose(stmt);
+
+                // commit
+                db.commit(connection);
+            } catch (SQLException sql) {
+                db.rollback(connection);
+                throw sql;
+            } finally {
+
+                /*
+                 * If there is error during query, close everything and throw
+                 * error
+                 */
+                try {
+                    if (stmt != null && !stmt.isClosed()) {
+                        stmt.close();
+                    }
+                } catch (SQLException sql) {
+                    throw sql;
+                } finally {
+                    if (connection != null) {
+                        db.closeConnection(connection);
+                    }
+                }
+            }
         }
         return list;
     }
 
     public void removeAncientMessages() {
+        Connection connection = null;
         try {
-            Connection connection = db.connect();
+            connection = db.connect();
+            PreparedStatement stmt = connection.prepareStatement(SQL_DELETE_ANCIENT_STATEMENT);
             long persevetime = System.currentTimeMillis() - this.time;
-            PreparedStatement stmt = connection.prepareStatement(String.format(SQL_DELETE_ANCIENT_STATEMENT, "msgBox",
-                    persevetime));
-            db.insertAndClose(stmt);
-            stmt.close();
-            db.closeConnection(connection);
-        } catch (SQLException e) {
-            logger.fatal("Caught exception while removing old entries from msgbox db table", e);
+            stmt.setTimestamp(1, new Timestamp(persevetime));
+            db.executeUpdateAndClose(stmt);
+            db.commitAndFree(connection);
+        } catch (SQLException sql) {
+            db.rollbackAndFree(connection);
+            logger.error("Caught exception while removing old entries from msgbox db table", sql);
         }
 
     }
 
     private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
         msgBoxids = Collections.synchronizedSet(new HashSet<String>());
-        Connection connection = db.connect();
-        Statement statement = connection.createStatement();
-        ResultSet resultSet = statement.executeQuery(SELECT_ALL_FROM_MSGBOXES);
-        while (resultSet.next()) {
-            msgBoxids.add(resultSet.getString("msgboxid"));
-        }
-        statement.close();
-        connection.commit();
-        db.closeConnection(connection);
+
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        try {
+            connection = db.connect();
+            stmt = connection.prepareStatement(SELECT_ALL_FROM_MSGBOXES);
+            ResultSet resultSet = stmt.executeQuery();
+            while (resultSet.next()) {
+                msgBoxids.add(resultSet.getString("msgboxid"));
+            }
+        } finally {
+            try {
+                if (stmt != null) {
+                    stmt.close();
+                }
+            } catch (SQLException sql) {
+                throw sql;
+            } finally {
+                db.commitAndFree(connection);
+            }
+        }
     }
 
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Mon Sep 19 23:13:33 2011
@@ -60,14 +60,13 @@ public class WsmgPersistantStorage imple
     private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
 
     private static final String TABLE_NAME_TO_CHECK = "subscription";
-    
+
     private Counter storeToDBCounter = new Counter();
 
     private JdbcStorage db = null;
 
-    String dbName = null;
+    private String dbName = null;
 
-    // private ConnectionPool connectionPool;
     public WsmgPersistantStorage(String ordinarySubsTblName, String specialSubsTblName, ConfigurationManager config)
             throws AxisFault {
 
@@ -76,18 +75,19 @@ public class WsmgPersistantStorage imple
         db = new JdbcStorage(config.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL),
                 config.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER));
 
+        Connection conn = null;
         try {
             /*
              * Check database
              */
-            Connection conn = db.connect();
+            conn = db.connect();
             if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
                 DatabaseCreator.createMsgBrokerDatabase(conn);
                 logger.info("New Database created for Message Broker");
             } else {
                 logger.info("Database already created for Message Broker!");
             }
-            db.closeConnection(conn);
+
             // inject dbname to sql statement.
             SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
                     SubscriptionConstants.INSERT_SQL_QUERY, ordinarySubsTblName);
@@ -104,6 +104,24 @@ public class WsmgPersistantStorage imple
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             throw AxisFault.makeFault(e);
+        } finally {
+            if (conn != null) {
+                db.closeConnection(conn);
+            }
+        }
+    }
+
+    private void quietlyClose(Statement stmt, Connection conn) {
+        try {
+            if (stmt != null && !stmt.isClosed()) {
+                stmt.close();
+            }
+        } catch (SQLException sql) {
+            logger.error(sql.getMessage(), sql);
+        }
+
+        if (conn != null) {
+            db.closeConnection(conn);
         }
     }
 
@@ -118,14 +136,17 @@ public class WsmgPersistantStorage imple
         ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
 
         String query = "SELECT * FROM " + dbName;
-        ResultSet rs = null;
+        Connection conn = null;
+        PreparedStatement stmt = null;
         try {
 
             // get number of row first and increase the arrayList size for
             // better performance
             int size = db.countRow(dbName, "*");
 
-            rs = db.query(query);
+            conn = db.connect();
+            stmt = conn.prepareStatement(query);
+            ResultSet rs = stmt.executeQuery();
             ret.ensureCapacity(size);
 
             if (rs != null) {
@@ -139,20 +160,12 @@ public class WsmgPersistantStorage imple
         } catch (SQLException ex) {
             logger.error("sql exception occured", ex);
         } finally {
-            if (rs != null) {
-                try {
-                    db.close();
-                    rs.close();
-                } catch (SQLException ex) {
-                    logger.error("sql exception occured", ex);
-                }
-            }
+            quietlyClose(stmt, conn);
         }
         return ret;
     }
 
     public int insert(SubscriptionState subscription) {
-        PreparedStatement stmt = null;
         String address = subscription.getConsumerReference().getAddress();
         Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
 
@@ -189,6 +202,7 @@ public class WsmgPersistantStorage imple
 
         int result = 0;
         Connection connection = null;
+        PreparedStatement stmt = null;
         try {
 
             connection = db.connect();
@@ -206,12 +220,10 @@ public class WsmgPersistantStorage imple
             stmt.setTimestamp(8, now);
             result = db.executeUpdateAndClose(stmt);
             storeToDBCounter.addCounter();
+            db.commitAndFree(connection);
         } catch (SQLException ex) {
             logger.error("sql exception occured", ex);
-        } finally {
-            if (connection != null) {
-                db.closeConnection(connection);
-            }
+            db.rollbackAndFree(connection);
         }
         return result;
     }
@@ -239,8 +251,8 @@ public class WsmgPersistantStorage imple
     public Object blockingDequeue() {
         while (true) {
             try {
-                //FIXME::: WHY RETURN KeyValueWrapper Object??????
-                //FIXME::: Can it cast to OutGoingMessage????
+                // FIXME::: WHY RETURN KeyValueWrapper Object??????
+                // FIXME::: Can it cast to OutGoingMessage????
                 KeyValueWrapper wrapper = retrive();
                 done(wrapper.getKey());
                 return wrapper.getValue();
@@ -255,10 +267,24 @@ public class WsmgPersistantStorage imple
     }
 
     public void cleanup() {
+        Connection conn = null;
+        Statement stmt = null;
         try {
-            cleanDB();
+            conn = db.connect();
+            stmt = conn.createStatement();
+            batchCleanDB(stmt, conn);
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
+        } finally {
+            if (db.isAutoCommit()) {
+                try {
+                    conn.setAutoCommit(true);
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+
+            quietlyClose(stmt, conn);
         }
     }
 
@@ -271,35 +297,45 @@ public class WsmgPersistantStorage imple
             int nextkey;
 
             connection = db.connect();
+
             lockMaxMinTables(connection);
+
             stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+
             ResultSet result = stmt.executeQuery();
 
             if (result.next()) {
                 nextkey = result.getInt(1);
                 result.close();
                 stmt.close();
+
                 stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
-                stmt.executeUpdate();
-                stmt.close();
+                db.executeUpdateAndClose(stmt);
             } else {
                 throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
             }
 
+            /*
+             * After update MAX_ID put data into queue table
+             */
             stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
             stmt.setInt(1, nextkey);
             stmt.setString(2, trackId);
-            byte[] buffer;
+
             ByteArrayOutputStream output = new ByteArrayOutputStream();
             ObjectOutputStream out = new ObjectOutputStream(output);
             out.writeObject(object);
-            buffer = output.toByteArray();
+            byte[] buffer = output.toByteArray();
             ByteArrayInputStream in = new ByteArrayInputStream(buffer);
             stmt.setBinaryStream(3, in, buffer.length);
             db.executeUpdateAndClose(stmt);
+            db.commit(connection);
+
         } catch (SQLException sqlEx) {
+            db.rollback(connection);
             logger.error("unable to enque the message in persistant storage", sqlEx);
         } catch (IOException ioEx) {
+            db.rollback(connection);
             logger.error("unable to enque the message in persistant storage", ioEx);
         } finally {
             try {
@@ -308,17 +344,7 @@ public class WsmgPersistantStorage imple
                 logger.error("Cannot Unlock Table", sql);
             }
 
-            if (connection != null) {
-                db.closeConnection(connection);
-            }
-
-            try {
-                if (stmt != null && !stmt.isClosed()) {
-                    stmt.close();
-                }
-            } catch (SQLException sql) {
-                logger.error(sql.getMessage(), sql);
-            }
+            quietlyClose(stmt, connection);
         }
     }
 
@@ -335,8 +361,8 @@ public class WsmgPersistantStorage imple
         PreparedStatement stmt = null;
         try {
             connection = db.connect();
+
             lockMaxMinTables(connection);
-            logger.debug("locked tables (maxId and minId)4");
 
             /*
              * Get Max ID
@@ -346,8 +372,7 @@ public class WsmgPersistantStorage imple
             if (!result.next()) {
                 stmt.close();
                 stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
-                stmt.executeUpdate();
-                stmt.close();
+                db.executeUpdateAndClose(stmt);
             }
 
             /*
@@ -358,27 +383,25 @@ public class WsmgPersistantStorage imple
             if (!result.next()) {
                 stmt.close();
                 stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
-                stmt.executeUpdate();
-                stmt.close();
+                db.executeUpdateAndClose(stmt);
             }
-
-            logger.debug("unlocked tables (maxId and minId)4");
+            db.commit(connection);
+        } catch (SQLException sqle) {
+            db.rollback(connection);
+            throw sqle;
         } finally {
-            if (connection != null) {
-                db.closeConnection(connection);
+            try {
+                unLockTables(connection);
+            } catch (SQLException sql) {
+                logger.error("Cannot Unlock Table", sql);
             }
 
-            unLockTables(connection);
-
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
+            quietlyClose(stmt, connection);
         }
     }
 
     private KeyValueWrapper retrive() throws SQLException, IOException {
         Object obj = null;
-        boolean loop = true;
         int nextkey = -1;
         int maxid = -2;
         Connection connection = null;
@@ -386,12 +409,12 @@ public class WsmgPersistantStorage imple
         ResultSet result = null;
         long wait = 1000;
 
-        while (loop) {
-            connection = db.connect();
-
+        while (true) {
             try {
+                connection = db.connect();
 
                 lockMaxMinTables(connection);
+
                 /*
                  * Get Min ID
                  */
@@ -421,41 +444,36 @@ public class WsmgPersistantStorage imple
                  */
                 if (maxid > nextkey) {
                     stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
-                    stmt.executeUpdate();
-                    stmt.close();
+                    db.executeUpdateAndClose(stmt);
                     logger.debug("Update MIN ID by one");
-
-                    unLockTables(connection);
-                    logger.debug("unlocked tables (maxId and minId)1");
+                    db.commit(connection);
                     break;
                 }
-
+                db.commit(connection);
+            } catch (SQLException sql) {
+                db.rollback(connection);
+                throw sql;
+            } finally {
                 try {
                     unLockTables(connection);
-                    logger.debug("unlocked tables (maxId and minId)1");
-
-                    wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
-                    logger.debug("Wait=" + wait);
-                    Thread.sleep(wait);
-                } catch (InterruptedException e) {
-                    logger.error(e.getMessage(), e);
-                    break;
-                }
-            } finally {
-                /*
-                 * Make sure connection is always closed
-                 */
-                if (connection != null) {
-                    db.closeConnection(connection);
+                } catch (SQLException sql) {
+                    sql.printStackTrace();
+                    logger.error("Cannot Unlock Table", sql);
                 }
 
-                if (stmt != null && !stmt.isClosed()) {
-                    try {
-                        stmt.close();
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                }
+                quietlyClose(stmt, connection);
+            }
+
+            /*
+             * Sleep if there is nothing to do
+             */
+            try {
+                wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
+                logger.debug("Wait=" + wait);
+                Thread.sleep(wait);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+                break;
             }
         }
 
@@ -481,18 +499,7 @@ public class WsmgPersistantStorage imple
                         "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
             }
         } finally {
-            if (connection != null) {
-                db.closeConnection(connection);
-            }
-
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-
+            quietlyClose(stmt, connection);
         }
     }
 
@@ -510,25 +517,23 @@ public class WsmgPersistantStorage imple
             query = QueueContants.SQL_DELETE_STATEMENT + key;
             PreparedStatement stmt = connection.prepareStatement(query);
             db.executeUpdateAndClose(stmt);
-        } finally {
-            if (connection != null) {
-                db.closeConnection(connection);
-            }
+            db.commitAndFree(connection);
+        } catch (SQLException sqle) {
+            db.rollbackAndFree(connection);
+            throw sqle;
         }
     }
 
-    public void cleanDB() throws SQLException {
+    private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
         DatabaseType databaseType = DatabaseType.other;
-        Connection con = null;
-        Statement stmt = null;
         int[] aiupdateCounts = new int[0];
         boolean bError = false;
         try {
-            con = db.connect();
-            stmt = con.createStatement();
-            stmt.clearBatch();
 
             con.setAutoCommit(false);
+
+            stmt.clearBatch();
+
             int totalStatement = 0;
 
             try {
@@ -568,10 +573,9 @@ public class WsmgPersistantStorage imple
             logger.error("Message:  " + bue.getMessage());
             logger.error("Vendor:  " + bue.getErrorCode());
             logger.info("Update counts:  ");
-            int[] updateCounts = bue.getUpdateCounts();
 
-            for (int i = 0; i < updateCounts.length; i++) {
-                logger.error(updateCounts[i] + "   ");
+            for (int i = 0; i < aiupdateCounts.length; i++) {
+                logger.error(aiupdateCounts[i] + "   ");
             }
 
             SQLException SQLe = bue;
@@ -579,13 +583,12 @@ public class WsmgPersistantStorage imple
                 SQLe = SQLe.getNextException();
                 logger.error(SQLe.getMessage(), SQLe);
             }
-        } // end BatchUpdateException catch
-        catch (SQLException SQLe) {
-            logger.error(SQLe.getMessage(), SQLe);
-        } // end SQLException catch
-        finally {
+        } catch (SQLException SQLe) {
+            bError = true;
+            throw SQLe;
+        } finally {
             // determine operation result
-            for (int i = 0; i < aiupdateCounts.length; i++) {
+            for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
                 int iProcessed = aiupdateCounts[i];
                 /**
                  * The int values that can be returned in the update counts
@@ -604,7 +607,6 @@ public class WsmgPersistantStorage imple
                     // error on statement
                     logger.info("Error batch." + iProcessed);
                     bError = true;
-                    break;
                 }
             }
 
@@ -615,28 +617,14 @@ public class WsmgPersistantStorage imple
             }
 
             /*
-             * Close previous execution statement if error occurs
-             */
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
-
-            /*
              * Unlock table after rollback and commit, since it is not automatic
-             * in mysql
+             * in MySql
              */
 
             if (DatabaseType.mysql.equals(databaseType)) {
                 PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
                 db.executeUpdateAndClose(prepareStmt);
             }
-
-            /*
-             * Release connection
-             */
-            db.closeConnection(con);
-
-            con.setAutoCommit(true);
         } // end finally
         logger.info("Queue is cleaned.");
     }
@@ -650,27 +638,26 @@ public class WsmgPersistantStorage imple
         }
 
         /*
-         * Must turn off autocommit
+         * Must turn off auto commit
          */
         connection.setAutoCommit(false);
         String sql = null;
-        PreparedStatement stmt = null;
+        Statement stmt = null;
         try {
             switch (databaseType) {
             case derby:
                 sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
                 String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
-                stmt = connection.prepareStatement(sql);
-                stmt.execute();
-                stmt.close();
-                stmt = connection.prepareStatement(sql2);
-                stmt.execute();
+                stmt = connection.createStatement();
+                stmt.addBatch(sql);
+                stmt.addBatch(sql2);
+                stmt.executeBatch();
                 break;
             case mysql:
                 sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
                         + " write";
-                stmt = connection.prepareStatement(sql);
-                stmt.executeQuery();
+                stmt = connection.createStatement();
+                stmt.executeQuery(sql);
                 break;
             default:
                 return;
@@ -684,7 +671,6 @@ public class WsmgPersistantStorage imple
     }
 
     private void unLockTables(Connection connection) throws SQLException {
-        String sql = "";
         DatabaseType databaseType = DatabaseType.other;
         try {
             databaseType = DatabaseCreator.getDatabaseType(connection);
@@ -695,14 +681,19 @@ public class WsmgPersistantStorage imple
         try {
             switch (databaseType) {
             case derby:
-                connection.commit();
+                /*
+                 * Derby doesn't have explicit unlock SQL It uses commit or
+                 * rollback as a unlock mechanism, so make sure that connection
+                 * is always commited or rollbacked
+                 */
                 break;
             case mysql:
-                sql = "unlock tables";
+                String sql = "unlock tables";
                 PreparedStatement stmt = null;
                 try {
                     stmt = connection.prepareStatement(sql);
                     stmt.executeQuery();
+                    db.commit(connection);
                 } finally {
                     if (stmt != null) {
                         stmt.close();
@@ -713,7 +704,12 @@ public class WsmgPersistantStorage imple
                 return;
             }
         } finally {
-            connection.setAutoCommit(true);
+            /*
+             * Set auto commit when needed
+             */
+            if (db.isAutoCommit()) {
+                connection.setAutoCommit(true);
+            }
         }
     }