You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/17 04:32:24 UTC

svn commit: r1171884 [2/2] - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/config/ commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ messagebox/src/main/org/apache/airavata/wsmg/...

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Sat Sep 17 02:32:24 2011
@@ -33,59 +33,48 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.StringTokenizer;
 
 import javax.xml.namespace.QName;
 import javax.xml.stream.XMLStreamException;
 
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.util.Counter;
 import org.apache.airavata.wsmg.util.TimerThread;
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WsmgPersistantStorage implements WsmgStorage {
-    org.apache.log4j.Logger logger = Logger.getLogger(WsmgPersistantStorage.class);
-
-    JdbcStorage db = null;
-
-    Connection conn = null;
+    private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
 
     private Counter storeToDBCounter = new Counter();
 
-    /*
-     * this thing is never used in this context
-     */
-    // private final static XmlInfosetBuilder builder = XmlConstants.BUILDER;
+    private JdbcStorage db = null;
+
     String dbName = null;
 
     // private ConnectionPool connectionPool;
     public WsmgPersistantStorage(String ordinarySubsTblName, String specialSubsTblName, ConfigurationManager config)
             throws AxisFault {
-        /*
-         * try { conn = db.connect(); } catch (SQLException ex) { ex.printStackTrace(); }
-         */
+
         this.dbName = ordinarySubsTblName;
 
         db = new JdbcStorage(config.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL),
                 config.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER));
 
-        // Lets connect to the database and create tables if they are not
-        // already there.
-
-
         // inject dbname to sql statement.
-
         SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
                 SubscriptionConstants.INSERT_SQL_QUERY, ordinarySubsTblName);
 
@@ -99,62 +88,50 @@ public class WsmgPersistantStorage imple
 
         try {
             initMessageQueueStorage();
-
         } catch (SQLException sqlEx) {
             throw AxisFault.makeFault(sqlEx);
         }
-
     }
 
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#getAllSubscription()
+     * @see
+     * org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
      */
     public List<SubscriptionEntry> getAllSubscription() {
 
         ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
 
-        int totalRecord = 0;
         String query = "SELECT * FROM " + dbName;
         ResultSet rs = null;
         try {
-            rs = db.query(query);
-            if (rs != null) {
-                // Get total number of rows
 
-                // Point to the last row in resultset.
-                rs.last();
-                // Get the row position which is also the number of rows in the
-                // ResultSet.
-                totalRecord = rs.getRow();
-                logger.debug("TotalRecord=" + totalRecord);
-                // Create String array to return
-
-                ret.ensureCapacity(totalRecord);
-
-                // Reposition at the beginning of the ResultSet to take up
-                // rs.next() call.
-//                rs.beforeFirst();
+            // get number of row first and increase the arrayList size for
+            // better performance
+            int size = db.countRow(dbName, "*");
+
+            rs = db.query(query);
+            ret.ensureCapacity(size);
 
+            if (rs != null) {
                 while (rs.next()) {
                     SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
                     subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
                     subscriptionEntry.setSubscribeXml(rs.getString("content"));
-
-                    /*
-                     * int policyValue = rs.getInt("wsrm"); subscriptionEntry[i] .setWsrmPolicy(policyValue ==
-                     * WsmgCommonConstants.WSRM_POLICY_TRUE);
-                     */
-
                     ret.add(subscriptionEntry);
-
                 }
-                rs.close();
             }
         } catch (SQLException ex) {
-            logger.fatal("sql exception occured", ex);
-            ret = new ArrayList<SubscriptionEntry>();
+            logger.error("sql exception occured", ex);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException ex) {
+                    logger.error("sql exception occured", ex);
+                }
+            }
         }
         return ret;
     }
@@ -181,23 +158,21 @@ public class WsmgPersistantStorage imple
                 }
 
             }
-
             consumerReferenceParameters = buffer.toString();
-
         }
+
         int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
         if (subscription.isWsrmPolicy()) {
             policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
         }
 
-        java.util.Date today = new java.util.Date();
-        java.sql.Timestamp now = new java.sql.Timestamp(today.getTime());
+        Date today = new Date();
+        Timestamp now = new Timestamp(today.getTime());
 
         String sql = subscription.isNeverExpire() ? SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY
                 : SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY;
 
         int result = 0;
-
         Connection connection = null;
         try {
 
@@ -205,8 +180,8 @@ public class WsmgPersistantStorage imple
             stmt = connection.prepareStatement(sql);
 
             stmt.setString(1, subscription.getId());
-            stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()),
-                    subscription.getSubscribeXml().getBytes().length);
+            stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
+                    .getSubscribeXml().getBytes().length);
             stmt.setInt(3, policyValue);
             stmt.setString(4, subscription.getLocalTopic());
             stmt.setString(5, subscription.getXpathString());
@@ -214,137 +189,75 @@ public class WsmgPersistantStorage imple
             stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
                     consumerReferenceParameters.getBytes().length);
             stmt.setTimestamp(8, now);
-            result = db.executeUpdate(stmt);
+            result = db.executeUpdateAndClose(stmt);
             storeToDBCounter.addCounter();
         } catch (SQLException ex) {
-            logger.fatal("sql exception occured", ex);
+            logger.error("sql exception occured", ex);
         } finally {
-
             if (connection != null) {
-                try {
-                    db.closeConnection(connection);
-                } catch (SQLException e) {
-
-                    e.printStackTrace();
-                }
+                db.closeConnection(connection);
             }
-
         }
-
         return result;
-
     }
+
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java.lang.String)
+     * @see
+     * org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java
+     * .lang.String)
      */
     public int delete(String subscriptionId) {
         String query;
         query = "DELETE FROM " + dbName + " WHERE SubscriptionId='" + subscriptionId + "'";
         try {
-            db.update(query);
+            return db.update(query);
         } catch (SQLException ex) {
-            logger.fatal("sql exception occured", ex);
-            // TODO : throw this exception
+            logger.error("sql exception occured", ex);
         }
         return 0;
     }
 
-    /**
-     * If data base tables are defined as SQL queries in file placed at xregistry/tables.sql in the classpath, those SQL
-     * queries are execuated against the data base. On the file, any line start # is igonred as a comment.
-     * 
-     * @throws XregistryException
-     */
-    private void initDatabaseTables(JdbcStorage jdbcStorage) throws AxisFault {
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        InputStream sqltablesAsStream = cl.getResourceAsStream("broker-tables.sql");
-
-        if (sqltablesAsStream == null) {
-            return;
-        }
-
-        Connection connection = jdbcStorage.connect();
-        try {
-            Statement statement = connection.createStatement();
-
-            String docAsStr = CommonRoutines.readFromStream(sqltablesAsStream);
-            StringTokenizer t = new StringTokenizer(docAsStr, ";");
-
-            while (t.hasMoreTokens()) {
-                String line = t.nextToken();
-                if (line.trim().length() > 0 && !line.startsWith("#")) {
-                    System.out.println(line.trim());
-                    statement.executeUpdate(line.trim());
-                }
-            }
-        } catch (SQLException e) {
-            throw AxisFault.makeFault(e);
-        } catch (IOException e) {
-            throw AxisFault.makeFault(e);
-        } finally {
-            try {
-                jdbcStorage.closeConnection(connection);
-            } catch (SQLException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        }
-
-    }
-
     // QUEUE related
 
     public Object blockingDequeue() {
-
-        boolean gotValue = false;
-
-        while (!gotValue) {
+        while (true) {
             try {
                 KeyValueWrapper wrapper = retrive();
-
                 done(wrapper.getKey());
                 return wrapper.getValue();
             } catch (SQLException e) {
-                // TODO Auto-generated catch block
+                logger.error(e.getMessage(), e);
                 e.printStackTrace();
             } catch (IOException e) {
-                // TODO Auto-generated catch block
+                logger.error(e.getMessage(), e);
                 e.printStackTrace();
             }
         }
-        return null;
-
     }
 
     public void cleanup() {
-
         try {
             cleanDB();
-            initMessageQueueStorage();
         } catch (SQLException e) {
-            logger.error(e);
+            logger.error(e.getMessage(), e);
         }
-
     }
 
     public void enqueue(Object object, String trackId) {
 
         // Get the Max ID cache and update and unlock the table
         Connection connection = null;
+        PreparedStatement stmt = null;
         try {
-            connection = db.connect();
-
-            PreparedStatement stmt = null;
-            lockTables(connection,stmt);
-            // System.out.println("locked maxId table");
+            int nextkey;
 
+            connection = db.connect();
+            lockMaxMinTables(connection);
             stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
             ResultSet result = stmt.executeQuery();
 
-            int nextkey;
-
             if (result.next()) {
                 nextkey = result.getInt(1);
                 result.close();
@@ -353,17 +266,10 @@ public class WsmgPersistantStorage imple
                 stmt.executeUpdate();
                 stmt.close();
             } else {
-                nextkey = 1;
-                result.close();
-                stmt.close();
-                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
-                stmt.executeUpdate();
-                stmt.close();
+                throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
             }
-            unLockTables(connection,stmt);
 
             stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
-
             stmt.setInt(1, nextkey);
             stmt.setString(2, trackId);
             byte[] buffer;
@@ -373,22 +279,29 @@ public class WsmgPersistantStorage imple
             buffer = output.toByteArray();
             ByteArrayInputStream in = new ByteArrayInputStream(buffer);
             stmt.setBinaryStream(3, in, buffer.length);
-            db.executeUpdate(stmt);
+            db.executeUpdateAndClose(stmt);
         } catch (SQLException sqlEx) {
             logger.error("unable to enque the message in persistant storage", sqlEx);
         } catch (IOException ioEx) {
             logger.error("unable to enque the message in persistant storage", ioEx);
-
         } finally {
+            try {
+                unLockTables(connection);
+            } catch (SQLException sql) {
+                logger.error("Cannot Unlock Table", sql);
+            }
 
             if (connection != null) {
-                try {
-                    db.closeConnection(connection);
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+                db.closeConnection(connection);
             }
 
+            try {
+                if (stmt != null && !stmt.isClosed()) {
+                    stmt.close();
+                }
+            } catch (SQLException sql) {
+                logger.error(sql.getMessage(), sql);
+            }
         }
     }
 
@@ -401,201 +314,236 @@ public class WsmgPersistantStorage imple
     }
 
     private void initMessageQueueStorage() throws SQLException {
-        Connection connection = db.connect();
-        String sql = "";
-        String databaseType = "";
+        Connection connection = null;
         PreparedStatement stmt = null;
-        lockTables(connection,stmt);
-        // System.out.println("locked tables (maxId and minId)4");
-        stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-        ResultSet result = stmt.executeQuery();
-        if (!result.next()) {
-            result.close();
-            stmt.close();
-            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
-            stmt.executeUpdate();
-            stmt.close();
-        }
-
-        stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
-        result = stmt.executeQuery();
-
-        if (!result.next()) {
-            result.close();
-            stmt.close();
-            stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
-            stmt.executeUpdate();
-            stmt.close();
-        }
-        unLockTables(connection,stmt);
-        db.closeConnection(connection);
-        // System.out.println("unlocked tables (maxId and minId)4");
+        try {
+            connection = db.connect();
+            lockMaxMinTables(connection);
+            logger.debug("locked tables (maxId and minId)4");
+
+            /*
+             * Get Max ID
+             */
+            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+            ResultSet result = stmt.executeQuery();
+            if (!result.next()) {
+                stmt.close();
+                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+                stmt.executeUpdate();
+                stmt.close();
+            }
+
+            /*
+             * Get Min ID
+             */
+            stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+            result = stmt.executeQuery();
+            if (!result.next()) {
+                stmt.close();
+                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+                stmt.executeUpdate();
+                stmt.close();
+            }
+
+            logger.debug("unlocked tables (maxId and minId)4");
+        } finally {
+            if (connection != null) {
+                db.closeConnection(connection);
+            }
+
+            unLockTables(connection);
+
+            if (stmt != null && !stmt.isClosed()) {
+                stmt.close();
+            }
+        }
     }
 
-    public KeyValueWrapper retrive() throws SQLException, IOException {
+    private KeyValueWrapper retrive() throws SQLException, IOException {
         Object obj = null;
-
-        // Get the smallest id
-        Connection connection = db.connect();
         boolean loop = true;
-
         int nextkey = -1;
         int maxid = -2;
+        Connection connection = null;
         PreparedStatement stmt = null;
         ResultSet result = null;
-        boolean connectionClosed = false;
         long wait = 1000;
-        while (loop) {
-            lockTables(connection,stmt);
-            stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
-            result = stmt.executeQuery();
-
-            if (result.next()) {
-                nextkey = result.getInt(1);
-                result.close();
-                stmt.close();
 
-            } else {
-                throw new RuntimeException("Queue init has failed earlier");
-            }
+        while (loop) {
+            connection = db.connect();
 
-            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-            result = stmt.executeQuery();
+            try {
 
-            if (result.next()) {
-                maxid = result.getInt(1);
-                result.close();
-                stmt.close();
+                lockMaxMinTables(connection);
+                /*
+                 * Get Min ID
+                 */
+                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+                result = stmt.executeQuery();
+                if (result.next()) {
+                    nextkey = result.getInt(1);
+                    stmt.close();
+                } else {
+                    throw new RuntimeException("Queue init has failed earlier");
+                }
 
-            } else {
-                throw new RuntimeException("Queue init has failed earlier");
-            }
+                /*
+                 * Get Max ID
+                 */
+                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+                result = stmt.executeQuery();
+                if (result.next()) {
+                    maxid = result.getInt(1);
+                    stmt.close();
+                } else {
+                    throw new RuntimeException("Queue init has failed earlier");
+                }
 
+                /*
+                 * Update value and exit the loop
+                 */
+                if (maxid > nextkey) {
+                    stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+                    stmt.executeUpdate();
+                    stmt.close();
+                    logger.debug("Update MIN ID by one");
 
-            unLockTables(connection,stmt);
-            // System.out.println("unlocked tables (maxId and minId)1");
-            if (maxid > nextkey) {
-                loop = false;
-                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
-                stmt.executeUpdate();
-                stmt.close();
-                unLockTables(connection,stmt);
-                // System.out.println("unlocked tables (maxId and minId) 2");
+                    unLockTables(connection);
+                    logger.debug("unlocked tables (maxId and minId)1");
+                    break;
+                }
 
-            } else {
                 try {
-                    unLockTables(connection,stmt);
-                    db.closeConnection(connection);
-                    connectionClosed = true;
+                    unLockTables(connection);
+                    logger.debug("unlocked tables (maxId and minId)1");
+
                     wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
-                    // System.out.println("Wait="+wait);
+                    logger.debug("Wait=" + wait);
                     Thread.sleep(wait);
                 } catch (InterruptedException e) {
+                    logger.error(e.getMessage(), e);
+                    break;
+                }
+            } finally {
+                /*
+                 * Make sure connection is always closed
+                 */
+                if (connection != null) {
+                    db.closeConnection(connection);
+                }
 
+                if (stmt != null && !stmt.isClosed()) {
+                    try {
+                        stmt.close();
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
                 }
             }
-            if (connectionClosed) {
-                connection = db.connect();
-                connectionClosed = false;
-            }
         }
 
-        stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
-        result = stmt.executeQuery();
-        // FIXME: THis loop caused out-of-memory
-        while (!result.next()) {
-            // FIXME Remove this while loop and change the
-            // order of db update if possible in the save.
-            result.close();
-            stmt.close();
-            // System.out.println("Looping in 1");
-            try {
-                Thread.sleep(2000);
-            } catch (InterruptedException e) {
-
-                logger.error("interruped while thread sleep", e);
-            }
+        /*
+         * Create Subscription Object from MIN_ID
+         */
+        try {
+            connection = db.connect();
             stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
             result = stmt.executeQuery();
+            if (result.next()) {
+                int id = result.getInt(1);
+                InputStream in = result.getAsciiStream(2);
+                ObjectInputStream s = new ObjectInputStream(in);
+                try {
+                    obj = s.readObject();
+                } catch (ClassNotFoundException e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return new KeyValueWrapper(id, obj);
+            } else {
+                throw new RuntimeException(
+                        "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
+            }
+        } finally {
+            if (connection != null) {
+                db.closeConnection(connection);
+            }
 
-        }
-        int id = result.getInt(1);
-        InputStream in = result.getAsciiStream(2);
-        ObjectInputStream s = new ObjectInputStream(in);
-        try {
-            obj = s.readObject();
-        } catch (ClassNotFoundException e) {
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
 
-            e.printStackTrace();
         }
-        result.close();
-        stmt.close();
-
-        db.closeConnection(connection);
-
-        return new KeyValueWrapper(id, obj);
-
     }
 
-    public void done(int key) throws SQLException {
+    /**
+     * Delete data in subscription table since it is read
+     * 
+     * @param key
+     * @throws SQLException
+     */
+    private void done(int key) throws SQLException {
         String query = null;
-        PreparedStatement stmt = null;
-        Connection connection = db.connect();
+        Connection connection = null;
         try {
+            connection = db.connect();
             query = QueueContants.SQL_DELETE_STATEMENT + key;
-            // System.out.println("Deleting key="+key);
-            stmt = connection.prepareStatement(query);
-            db.executeUpdate(stmt);
-            stmt.close();
+            PreparedStatement stmt = connection.prepareStatement(query);
+            db.executeUpdateAndClose(stmt);
         } finally {
-            db.closeConnection(connection);
+            if (connection != null) {
+                db.closeConnection(connection);
+            }
         }
-
     }
 
     public void cleanDB() throws SQLException {
-        Connection con = db.connect();
-        Statement stmt = con.createStatement();
+        DatabaseType databaseType = DatabaseType.other;
+        Connection con = null;
+        Statement stmt = null;
         int[] aiupdateCounts = new int[0];
         boolean bError = false;
         try {
+            con = db.connect();
+            stmt = con.createStatement();
+            stmt.clearBatch();
 
             con.setAutoCommit(false);
-            // ...
-            bError = false;
-            stmt.clearBatch();
             int totalStatement = 0;
-            String databaseType = "";
+
             try {
-                databaseType = DatabaseCreator.getDatabaseType(conn);
+                databaseType = DatabaseCreator.getDatabaseType(con);
             } catch (Exception e) {
-                logger.error("Error evaluating database type");
+                logger.error("Error evaluating database type", e);
             }
             // add SQL statements
-            if("mysql".equals(databaseType)){
+            if (DatabaseType.mysql.equals(databaseType)) {
                 stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
+                totalStatement++;
+            } else if (DatabaseType.derby.equals(databaseType)) {
+                stmt.addBatch("lock table disQ execusive mode;");
+                totalStatement++;
+                stmt.addBatch("lock table MaxIDTable execusive mode;");
+                totalStatement++;
+                stmt.addBatch("lock table MinIDTable execusive mode;");
+                totalStatement++;
             }
-            // System.out.println("locked tables (maxId and minId) 5");
             stmt.addBatch("Delete from disQ;");
+            totalStatement++;
             stmt.addBatch("Delete from MaxIDTable;");
+            totalStatement++;
             stmt.addBatch("Delete from MinIDTable;");
-
-            if ("mysql".equals(databaseType)) {
-                stmt.addBatch("unlock tables;");
-            }
-            // System.out.println("unlocked tables (maxId and minId) 5");
-            totalStatement = 5;
+            totalStatement++;
 
             aiupdateCounts = new int[totalStatement];
 
             // execute the statements
             aiupdateCounts = stmt.executeBatch();
 
-        } // end try
-
-        // catch blocks
-        // ...
-        catch (BatchUpdateException bue) {
+        } catch (BatchUpdateException bue) {
             bError = true;
             aiupdateCounts = bue.getUpdateCounts();
             logger.error("SQLException: " + bue.getMessage());
@@ -611,53 +559,147 @@ public class WsmgPersistantStorage imple
 
             SQLException SQLe = bue;
             while (SQLe != null) {
-                // do exception stuff
-
                 SQLe = SQLe.getNextException();
-                logger.error(SQLe);
+                logger.error(SQLe.getMessage(), SQLe);
             }
         } // end BatchUpdateException catch
         catch (SQLException SQLe) {
-            // ...
-            logger.error(SQLe);
+            logger.error(SQLe.getMessage(), SQLe);
         } // end SQLException catch
         finally {
-            db.closeConnection(con);
             // determine operation result
             for (int i = 0; i < aiupdateCounts.length; i++) {
                 int iProcessed = aiupdateCounts[i];
-                // The int values that can be returned in the update counts
-                // array are:
-                // -3--Operation error. A driver has the option to stop at the
-                // first error and throw a BatchUpdateException or to report the
-                // error and continue. This value is only seen in the latter
-                // case.
-                // -2--The operation was successful, but the number of rows
-                // affected is unknown.
-                // Zero--DDL statement or no rows affected by the operation.
-                // Greater than zero--Operation was successful, number of rows
-                // affected by the operation.
-                if (iProcessed > 0 || iProcessed == -2) {
-                    // statement was successful
-                    // ...
-                } else {
+                /**
+                 * The int values that can be returned in the update counts
+                 * array are: <br/>
+                 * -3--Operation error. A driver has the option to stop at the
+                 * first error and throw a BatchUpdateException or to report the
+                 * error and continue. This value is only seen in the latter
+                 * case. <br/>
+                 * -2--The operation was successful, but the number of rows
+                 * affected is unknown. <br/>
+                 * Zero--DDL statement or no rows affected by the operation.
+                 * Greater than zero--Operation was successful, number of rows
+                 * affected by the operation.
+                 */
+                if (iProcessed < 0 && iProcessed != -2) {
                     // error on statement
-                    logger.info("Batch update successful.");
+                    logger.info("Error batch." + iProcessed);
                     bError = true;
                     break;
                 }
-            } // end for
+            }
 
             if (bError) {
                 con.rollback();
             } else {
                 con.commit();
             }
+
+            /*
+             * Close previous execution statement if error occurs
+             */
+            if (stmt != null && !stmt.isClosed()) {
+                stmt.close();
+            }
+
+            /*
+             * Unlock table after rollback and commit, since it is not automatic
+             * in mysql
+             */
+
+            if (DatabaseType.mysql.equals(databaseType)) {
+                PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
+                db.executeUpdateAndClose(prepareStmt);
+            }
+
+            /*
+             * Release connection
+             */
+            db.closeConnection(con);
+
             con.setAutoCommit(true);
         } // end finally
         logger.info("Queue is cleaned.");
     }
 
+    private void lockMaxMinTables(Connection connection) throws SQLException {
+        DatabaseType databaseType = DatabaseType.other;
+        try {
+            databaseType = DatabaseCreator.getDatabaseType(connection);
+        } catch (Exception e) {
+            logger.error("Error evaluating database type", e);
+        }
+
+        /*
+         * Must turn off autocommit
+         */
+        connection.setAutoCommit(false);
+        String sql = null;
+        PreparedStatement stmt = null;
+        try {
+            switch (databaseType) {
+            case derby:
+                sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " EXCLUSIVE MODE";
+                String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " EXCLUSIVE MODE";
+                stmt = connection.prepareStatement(sql);
+                stmt.executeQuery();
+                stmt.close();
+                stmt = connection.prepareStatement(sql2);
+                stmt.executeQuery();
+                break;
+            case mysql:
+                sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
+                        + " write";
+                stmt = connection.prepareStatement(sql);
+                stmt.executeQuery();
+                break;
+            default:
+                return;
+            }
+
+        } finally {
+            if (stmt != null && !stmt.isClosed()) {
+                stmt.close();
+            }
+        }
+    }
+
+    private void unLockTables(Connection connection) throws SQLException {
+        String sql = "";
+        DatabaseType databaseType = DatabaseType.other;
+        try {
+            databaseType = DatabaseCreator.getDatabaseType(connection);
+        } catch (Exception e) {
+            logger.error("Error evaluating database type", e);
+        }
+
+        try {
+            switch (databaseType) {
+            case derby:
+                connection.commit();
+                break;
+            case mysql:
+                sql = "unlock tables";
+                PreparedStatement stmt = null;
+                try {
+                    stmt = connection.prepareStatement(sql);
+                    stmt.executeQuery();
+                } finally {
+                    if (stmt != null) {
+                        stmt.close();
+                    }
+                }
+                break;
+            default:
+                return;
+            }
+        } finally {
+            connection.setAutoCommit(true);
+        }
+    }
+
     private static class SubscriptionConstants {
 
         public static String INSERT_SQL_QUERY = "INSERT INTO %s(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
@@ -666,7 +708,6 @@ public class WsmgPersistantStorage imple
         public static String ORDINARY_SUBSCRIPTION_INSERT_QUERY = null;
 
         public static String SPECIAL_SUBSCRIPTION_INSERT_QUERY = null;
-
     }
 
     private static class QueueContants {
@@ -700,46 +741,5 @@ public class WsmgPersistantStorage imple
         public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
 
     }
-    private void lockTables(Connection connection,PreparedStatement stmt)throws SQLException{
-        String sql = "";
-        String databaseType = "";
-        try {
-            databaseType = DatabaseCreator.getDatabaseType(connection);
-        } catch (Exception e) {
-            logger.error("Error evaluating database type");
-        }
-
-        if ("derby".equals(databaseType)) {
-//            sql = "lock table " + QueueContants.TABLE_NAME_MAXID + " IN SHARE MODE";
-//            stmt = connection.prepareStatement(sql);
-//            stmt.executeUpdate();
-//            sql = "lock table " + QueueContants.TABLE_NAME_MINID + " IN SHARE MODE";
-//            connection.prepareStatement(sql);
-//            stmt.executeUpdate();
-//            stmt.close();
-        } else if ("mysql".equals(databaseType)) {
-            sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + ","
-                    + QueueContants.TABLE_NAME_MINID + " write";
-            stmt = connection.prepareStatement(sql);
-            stmt.executeQuery();
-            stmt.close();
-        }
-    }
-        private void unLockTables(Connection connection,PreparedStatement stmt)throws SQLException{
-        String sql = "";
-        String databaseType = "";
-        try {
-            databaseType = DatabaseCreator.getDatabaseType(connection);
-        } catch (Exception e) {
-            logger.error("Error evaluating database type");
-        }
-
-        if ("mysql".equals(databaseType)) {
-            sql = "unlock tables";
-            stmt = connection.prepareStatement(sql);
-            stmt.executeQuery();
-            stmt.close();
-        }
-    }
 
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java Sat Sep 17 02:32:24 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.airavata.wsmg.broker.NotificationProcessor;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
 import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
 import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
@@ -38,15 +39,9 @@ public class WsmgConfigurationContext {
 
     private OutGoingQueue outgoingQueue = null;
 
-    // private AbstractMessageMatcher messageMatcher;
-
     private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
 
-    private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();// TODO:
-
-    // parameterize
-    // the
-    // map
+    private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();
 
     private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Sat Sep 17 02:32:24 2011
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
 import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.util.RunTimeStatistics;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java Sat Sep 17 02:32:24 2011
@@ -31,7 +31,7 @@ import org.apache.airavata.wsmg.broker.C
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
 import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol;
 import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Sat Sep 17 02:32:24 2011
@@ -31,7 +31,7 @@ import org.apache.airavata.wsmg.broker.C
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
 import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
 import org.apache.airavata.wsmg.messenger.SenderUtils;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Sat Sep 17 02:32:24 2011
@@ -35,7 +35,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
 import org.apache.airavata.wsmg.messenger.SenderUtils;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Sat Sep 17 02:32:24 2011
@@ -30,7 +30,7 @@ import org.apache.airavata.wsmg.broker.A
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
 import org.apache.airavata.wsmg.messenger.SenderUtils;

Modified: incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java Sat Sep 17 02:32:24 2011
@@ -21,7 +21,6 @@
 
 package org.apache.airavata.wsmg.messenger;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 
@@ -33,8 +32,8 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
 import org.apache.airavata.wsmg.commons.WsmgVersion;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
 import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;