You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/20 01:13:34 UTC
svn commit: r1172890 - in /incubator/airavata/trunk/modules/ws-messenger:
commons/src/main/java/org/apache/airavata/wsmg/commons/storage/
messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/
messagebox/src/main/org/apache/airavata/wsmg/msgbox/S...
Author: patanachai
Date: Mon Sep 19 23:13:33 2011
New Revision: 1172890
URL: http://svn.apache.org/viewvc?rev=1172890&view=rev
Log:
AIRAVATA-82 fix synchronization and always release table lock
Modified:
incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Mon Sep 19 23:13:33 2011
@@ -105,6 +105,15 @@ public class ConnectionPool {
initialize(initialConnections, maxConnections, waitIfBusy);
}
+ /**
+ * Check if this connection pool is auto commit or not
+ *
+ * @return
+ */
+ public boolean isAutoCommit() {
+ return this.autoCommit;
+ }
+
private void initialize(int initialConnections, int maxConnections, boolean waitIfBusy) throws SQLException {
this.maxConnections = maxConnections;
this.waitIfBusy = waitIfBusy;
Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java Mon Sep 19 23:13:33 2011
@@ -32,10 +32,6 @@ import org.slf4j.LoggerFactory;
public class JdbcStorage {
private static Logger log = LoggerFactory.getLogger(JdbcStorage.class);
- private PreparedStatement stmt = null;
-
- private ResultSet rs = null;
-
private ConnectionPool connectionPool;
public JdbcStorage(String jdbcUrl, String jdbcDriver) {
@@ -50,12 +46,55 @@ public class JdbcStorage {
Connection.TRANSACTION_SERIALIZABLE);
} else {
connectionPool = new ConnectionPool(driver, url, initCon, maxCon, true);
- }
+ }
} catch (Exception e) {
throw new RuntimeException("Failed to create database connection pool.", e);
}
}
+ /**
+ * Check if this connection pool is auto commit or not
+ *
+ * @return
+ */
+ public boolean isAutoCommit() {
+ return connectionPool.isAutoCommit();
+ }
+
+ public void commit(Connection conn) {
+ try {
+ if (conn != null && !conn.getAutoCommit()) {
+ conn.commit();
+ }
+ } catch (SQLException sqle) {
+ log.error("Cannot commit data", sqle);
+ }
+ }
+
+ public void commitAndFree(Connection conn) {
+ commit(conn);
+ if (conn != null) {
+ closeConnection(conn);
+ }
+ }
+
+ public void rollback(Connection conn) {
+ try {
+ if (conn != null && !conn.getAutoCommit()) {
+ conn.rollback();
+ }
+ } catch (SQLException sqle) {
+ log.error("Cannot Rollback data", sqle);
+ }
+ }
+
+ public void rollbackAndFree(Connection conn) {
+ rollback(conn);
+ if (conn != null) {
+ closeConnection(conn);
+ }
+ }
+
public Connection connect() {
Connection conn = null;
@@ -75,17 +114,21 @@ public class JdbcStorage {
public int update(String query) throws SQLException {
int result = 0;
Connection conn = null;
+ PreparedStatement stmt = null;
try {
conn = connectionPool.getConnection();
stmt = conn.prepareStatement(query);
result = stmt.executeUpdate();
+ commit(conn);
+ } catch (SQLException sql) {
+ rollback(conn);
+ throw sql;
} finally {
- if (conn != null) {
- connectionPool.free(conn);
- }
-
if (stmt != null && !stmt.isClosed()) {
stmt.close();
+ }
+ if(conn!=null){
+ closeConnection(conn);
}
}
return result;
@@ -111,45 +154,24 @@ public class JdbcStorage {
return rows;
}
- public ResultSet query(String query) throws SQLException {
- ResultSet rs = null;
- Connection conn = null;
- try {
- conn = connectionPool.getConnection();
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- conn.setAutoCommit(false);
- } finally {
- if (conn != null) {
- connectionPool.free(conn);
- }
- }
- return rs;
- }
-
- public void close() throws SQLException {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- }
-
public int countRow(String tableName, String columnName) throws SQLException {
String query = new String("SELECT COUNT(" + columnName + ") FROM " + tableName);
int count = -1;
Connection conn = null;
+ PreparedStatement stmt = null;
try {
conn = connectionPool.getConnection();
stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery();
rs.next();
count = rs.getInt(1);
} finally {
- if (conn != null) {
- connectionPool.free(conn);
- }
if (stmt != null && !stmt.isClosed()) {
stmt.close();
}
+ if (conn != null) {
+ connectionPool.free(conn);
+ }
}
return count;
}
@@ -162,34 +184,19 @@ public class JdbcStorage {
public int insert(String query) throws SQLException {
int rows = 0;
Connection conn = null;
+ PreparedStatement stmt = null;
try {
conn = connectionPool.getConnection();
stmt = conn.prepareStatement(query);
rows = stmt.executeUpdate();
} finally {
- if (conn != null) {
- connectionPool.free(conn);
- }
if (stmt != null && !stmt.isClosed()) {
stmt.close();
}
- }
- return rows;
- }
- /**
- * This method is provided so that yo can have better control over the
- * statement. For example: You can use stmt.setString to convert quotation
- * mark automatically in an INSERT statement
- *
- * NOTE: Statement is closed after execution
- */
- public int insertAndClose(PreparedStatement stmt) throws SQLException {
- int rows = 0;
- try {
- rows = stmt.executeUpdate();
- } finally {
- stmt.close();
+ if (conn != null) {
+ connectionPool.free(conn);
+ }
}
return rows;
}
@@ -197,5 +204,5 @@ public class JdbcStorage {
public void closeAllConnections() {
if (connectionPool != null)
connectionPool.dispose();
- }
+ }
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java Mon Sep 19 23:13:33 2011
@@ -26,19 +26,16 @@ import java.util.List;
import org.apache.axiom.om.OMElement;
/**
- * Message Box storage backend. This has implemented in two ways inmemory and database.
+ * Message Box storage backend. This has implemented in two ways inmemory and
+ * database.
*/
public interface MsgBoxStorage {
public String createMsgBox() throws Exception;
- // DELETE FROM msgbox WHERE msgboxid=key
public void destroyMsgBox(String key) throws Exception;
- // SELECT * FROM msgbox WHERE msgboxid=key ORDER BY id LIMIT 1
- // DELETE FROM msgbox WHERE msgboxid=key AND id=*
public List<String> takeMessagesFromMsgBox(String key) throws Exception;
- // INSERT INTO msgbox ...
public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
throws Exception;
@@ -46,6 +43,4 @@ public interface MsgBoxStorage {
* The ancientness is defined in the db.config file.
*/
public void removeAncientMessages() throws Exception;
-
- // public void closeConnection() throws Exception;
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java Mon Sep 19 23:13:33 2011
@@ -36,21 +36,15 @@ import org.apache.axiom.om.OMElement;
* Database message Storage Implementation, if msgBox.properties configured to use database this will set as the storage
* for MsgBoxSerivceSkeleton
*/
-public class DatabaseStorageImpl implements MsgBoxStorage {
-
- // private MessageBoxDB messageBoxDB;
-
- // private final static MLogger logger = MLogger.getLogger();
+public class DatabaseStorageImpl implements MsgBoxStorage {
public DatabaseStorageImpl(JdbcStorage db, long timeOfOldMessage) throws SQLException {
MessageBoxDB.initialize(db, timeOfOldMessage);
}
public String createMsgBox() throws SQLException, IOException {
- // String uuid = FastUUIDGen.nextUUID();// generate uuid
-
String uuid = UUID.randomUUID().toString();
- MessageBoxDB.getInstance().createMsgBx(uuid, "msgBoxes");
+ MessageBoxDB.getInstance().createMsgBx(uuid);
return uuid;
}
@@ -60,13 +54,9 @@ public class DatabaseStorageImpl impleme
} catch (SQLException e) {
throw new Exception("Could not destroy the message box with key " + key, e);
}
- // logger.finest("Message box with key " + key +
- // " was destroyed successfully");
-
}
public List<String> takeMessagesFromMsgBox(String key) throws Exception {
- // String[] message;
List<String> list = null;
try {
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java Mon Sep 19 23:13:33 2011
@@ -31,7 +31,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
@@ -43,87 +43,116 @@ import javax.xml.stream.XMLStreamExcepti
import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is the core class which used by DatabaseStorageImpl to perform all the service operations, DatabaseStorageImpl class
- * simply use this class in its operation methods to perform the actual funcationality.
+ * This is the core class which used by DatabaseStorageImpl to perform all the
+ * service operations, DatabaseStorageImpl class simply use this class in its
+ * operation methods to perform the actual funcationality.
*/
public class MessageBoxDB {
- static Logger logger = Logger.getLogger(MessageBoxDB.class);
+ private static final String MSGBOXES_TABLENAME = "msgBoxes";
+ private static final String MSGBOX_TABLENAME = "msgbox";
+
+ private static final Logger logger = LoggerFactory.getLogger(MessageBoxDB.class);
private static Set<String> msgBoxids;
- public static final String SELECT_ALL_FROM_MSGBOXES = "SELECT * FROM msgBoxes";
+ public static final String SELECT_ALL_FROM_MSGBOXES = "SELECT * FROM " + MSGBOXES_TABLENAME;
+
+ public static final String SQL_CREATE_MSGBOXES_STATEMENT = "INSERT INTO " + MSGBOXES_TABLENAME
+ + " (msgboxid) VALUES (?)";
- public static String SQL_STORE_MESSAGE_STATEMENT = "INSERT INTO msgbox (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
+ public static final String SQL_DELETE_MSGBOXES_STATEMENT = "DELETE FROM " + MSGBOXES_TABLENAME
+ + " WHERE msgboxid = ?";
- public static String SQL_CREATE_MSGBOX_STATEMENT = "INSERT INTO %s (msgboxid) VALUES ('%s')";
+ public static final String SQL_STORE_MESSAGE_STATEMENT = "INSERT INTO " + MSGBOX_TABLENAME
+ + " (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
- public static String SQL_DELETE_ALL_STATEMENT = "DELETE FROM %s WHERE msgboxid='%s'";
+ public static final String SQL_SELECT_MSGBOX_STATEMENT = "SELECT * FROM " + MSGBOX_TABLENAME
+ + " WHERE msgboxid = ? ORDER BY time ";
- public static String SQL_SELECT_STATEMENT1 = "SELECT * FROM %s WHERE msgboxid='%s' ORDER BY time ";
+ public static final String SQL_DELETE_MSGBOX_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE msgboxid = ?";
+
+ public static final String SQL_DELETE_ANCIENT_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE time < ?";
- public static String SQL_DELETE_ANCIENT_STATEMENT = "DELETE FROM %s WHERE time <'%s'";
-
private JdbcStorage db;
-
+
private static MessageBoxDB instance;
-
- private long time;
- private MessageBoxDB(JdbcStorage db) {
+ private long time;
+
+ private MessageBoxDB(JdbcStorage db, long time) {
this.db = db;
+ this.time = time;
}
-
- public static MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException{
- if(instance == null){
- instance = new MessageBoxDB(db);
+
+ public static MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException {
+ if (instance == null) {
+ instance = new MessageBoxDB(db, time);
setMsgBoxidList(db);
}
return instance;
}
-
- public static MessageBoxDB getInstance(){
- if(instance==null){
+
+ public static MessageBoxDB getInstance() {
+ if (instance == null) {
throw new RuntimeException("Please initialize this object first using initialize(JdbcStorage, long)");
}
return instance;
}
- public void createMsgBx(String messageBoxId, String tableName) throws SQLException, IOException {
+ public void createMsgBx(String messageBoxId) throws SQLException, IOException {
if (!msgBoxids.contains(messageBoxId)) {
- Connection connection = db.connect();
- Statement statement = connection.createStatement();
- System.out.println(tableName + ":" + messageBoxId);
- statement.execute(String.format(SQL_CREATE_MSGBOX_STATEMENT, tableName, messageBoxId));
- connection.commit();
- db.closeConnection(connection);
- msgBoxids.add(messageBoxId);
- } else
+
+ Connection connection = null;
+ try {
+ logger.debug(MSGBOXES_TABLENAME + ":" + messageBoxId);
+
+ connection = db.connect();
+ PreparedStatement statement = connection.prepareStatement(SQL_CREATE_MSGBOXES_STATEMENT);
+ statement.setString(1, messageBoxId);
+ db.executeUpdateAndClose(statement);
+ db.commitAndFree(connection);
+
+ msgBoxids.add(messageBoxId);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
+ } else {
throw new AxisFault("The message box ID requested already exists");
+ }
}
public void addMessage(String msgBoxID, String messageID, String soapAction, OMElement message)
throws SQLException, IOException, XMLStreamException {
if (msgBoxids.contains(msgBoxID)) {
- Connection connection = db.connect();
- PreparedStatement stmt = connection.prepareStatement(SQL_STORE_MESSAGE_STATEMENT);
- byte[] buffer;
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(output);
- out.writeObject(message.toStringWithConsume());
- buffer = output.toByteArray();
- ByteArrayInputStream in = new ByteArrayInputStream(buffer);
- stmt.setBinaryStream(1, in, buffer.length);
- stmt.setString(2, msgBoxID);
- stmt.setString(3, messageID);
- stmt.setString(4, soapAction);
- db.insertAndClose(stmt);
- stmt.close();
- connection.commit();
- db.closeConnection(connection);
+
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SQL_STORE_MESSAGE_STATEMENT);
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(output);
+ out.writeObject(message.toStringWithConsume());
+ byte[] buffer = output.toByteArray();
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+ stmt.setBinaryStream(1, in, buffer.length);
+ stmt.setString(2, msgBoxID);
+ stmt.setString(3, messageID);
+ stmt.setString(4, soapAction);
+
+ db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
} else {
throw new AxisFault("Currently a messagebox is not available with given message box id :" + msgBoxID);
}
@@ -132,12 +161,27 @@ public class MessageBoxDB {
public void deleteMessageBox(String msgBoxId) throws SQLException {
if (msgBoxids.contains(msgBoxId)) {
- Connection connection = db.connect();
- Statement statement = connection.createStatement();
- statement.execute(String.format(SQL_DELETE_ALL_STATEMENT, "msgbox", msgBoxId));
- statement.execute(String.format(SQL_DELETE_ALL_STATEMENT, "msgBoxes", msgBoxId));
- db.closeConnection(connection);
- msgBoxids.remove(msgBoxId);
+
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement statement = connection.prepareStatement(SQL_DELETE_MSGBOXES_STATEMENT);
+ statement.setString(1, msgBoxId);
+ db.executeUpdateAndClose(statement);
+ statement = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+ statement.setString(1, msgBoxId);
+ db.executeUpdateAndClose(statement);
+
+ // commit
+ db.commitAndFree(connection);
+
+ // remove from set
+ msgBoxids.remove(msgBoxId);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
}
}
@@ -145,57 +189,97 @@ public class MessageBoxDB {
ClassNotFoundException, XMLStreamException {
LinkedList<String> list = new LinkedList<String>();
if (msgBoxids.contains(msgBoxId)) {
- Connection connection = db.connect();
-
- PreparedStatement stmt = connection.prepareStatement(String.format(SQL_SELECT_STATEMENT1, "msgbox",
- msgBoxId));
- ResultSet resultSet = stmt.executeQuery();
-// resultSet.beforeFirst();
- while (resultSet.next()) {
- InputStream in = resultSet.getAsciiStream("content");
- ObjectInputStream s = new ObjectInputStream(in);
- String xmlString = (String) s.readObject();
- System.out.println(xmlString);
- list.addFirst(xmlString);
- }
- resultSet.close();
- stmt.close();
- stmt = connection.prepareStatement(String.format(SQL_DELETE_ALL_STATEMENT, "msgbox", msgBoxId));
- db.insertAndClose(stmt);
- stmt.close();
- connection.commit();
- db.closeConnection(connection);
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+ connection = db.connect();
+ stmt = connection.prepareStatement(SQL_SELECT_MSGBOX_STATEMENT);
+ stmt.setString(1, msgBoxId);
+ ResultSet resultSet = stmt.executeQuery();
+ while (resultSet.next()) {
+ InputStream in = resultSet.getAsciiStream("content");
+ ObjectInputStream s = new ObjectInputStream(in);
+ String xmlString = (String) s.readObject();
+ logger.debug(xmlString);
+ list.addFirst(xmlString);
+ }
+ resultSet.close();
+ stmt.close();
+
+ /*
+ * Delete all retrieved messages
+ */
+ stmt = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+ stmt.setString(1, msgBoxId);
+ db.executeUpdateAndClose(stmt);
+
+ // commit
+ db.commit(connection);
+ } catch (SQLException sql) {
+ db.rollback(connection);
+ throw sql;
+ } finally {
+
+ /*
+ * If there is error during query, close everything and throw
+ * error
+ */
+ try {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ } catch (SQLException sql) {
+ throw sql;
+ } finally {
+ if (connection != null) {
+ db.closeConnection(connection);
+ }
+ }
+ }
}
return list;
}
public void removeAncientMessages() {
+ Connection connection = null;
try {
- Connection connection = db.connect();
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SQL_DELETE_ANCIENT_STATEMENT);
long persevetime = System.currentTimeMillis() - this.time;
- PreparedStatement stmt = connection.prepareStatement(String.format(SQL_DELETE_ANCIENT_STATEMENT, "msgBox",
- persevetime));
- db.insertAndClose(stmt);
- stmt.close();
- db.closeConnection(connection);
- } catch (SQLException e) {
- logger.fatal("Caught exception while removing old entries from msgbox db table", e);
+ stmt.setTimestamp(1, new Timestamp(persevetime));
+ db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ logger.error("Caught exception while removing old entries from msgbox db table", sql);
}
}
private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
msgBoxids = Collections.synchronizedSet(new HashSet<String>());
- Connection connection = db.connect();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(SELECT_ALL_FROM_MSGBOXES);
- while (resultSet.next()) {
- msgBoxids.add(resultSet.getString("msgboxid"));
- }
- statement.close();
- connection.commit();
- db.closeConnection(connection);
+
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+ connection = db.connect();
+ stmt = connection.prepareStatement(SELECT_ALL_FROM_MSGBOXES);
+ ResultSet resultSet = stmt.executeQuery();
+ while (resultSet.next()) {
+ msgBoxids.add(resultSet.getString("msgboxid"));
+ }
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException sql) {
+ throw sql;
+ } finally {
+ db.commitAndFree(connection);
+ }
+ }
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1172890&r1=1172889&r2=1172890&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Mon Sep 19 23:13:33 2011
@@ -60,14 +60,13 @@ public class WsmgPersistantStorage imple
private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
private static final String TABLE_NAME_TO_CHECK = "subscription";
-
+
private Counter storeToDBCounter = new Counter();
private JdbcStorage db = null;
- String dbName = null;
+ private String dbName = null;
- // private ConnectionPool connectionPool;
public WsmgPersistantStorage(String ordinarySubsTblName, String specialSubsTblName, ConfigurationManager config)
throws AxisFault {
@@ -76,18 +75,19 @@ public class WsmgPersistantStorage imple
db = new JdbcStorage(config.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL),
config.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER));
+ Connection conn = null;
try {
/*
* Check database
*/
- Connection conn = db.connect();
+ conn = db.connect();
if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
DatabaseCreator.createMsgBrokerDatabase(conn);
logger.info("New Database created for Message Broker");
} else {
logger.info("Database already created for Message Broker!");
}
- db.closeConnection(conn);
+
// inject dbname to sql statement.
SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
SubscriptionConstants.INSERT_SQL_QUERY, ordinarySubsTblName);
@@ -104,6 +104,24 @@ public class WsmgPersistantStorage imple
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw AxisFault.makeFault(e);
+ } finally {
+ if (conn != null) {
+ db.closeConnection(conn);
+ }
+ }
+ }
+
+ private void quietlyClose(Statement stmt, Connection conn) {
+ try {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ } catch (SQLException sql) {
+ logger.error(sql.getMessage(), sql);
+ }
+
+ if (conn != null) {
+ db.closeConnection(conn);
}
}
@@ -118,14 +136,17 @@ public class WsmgPersistantStorage imple
ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
String query = "SELECT * FROM " + dbName;
- ResultSet rs = null;
+ Connection conn = null;
+ PreparedStatement stmt = null;
try {
// get number of row first and increase the arrayList size for
// better performance
int size = db.countRow(dbName, "*");
- rs = db.query(query);
+ conn = db.connect();
+ stmt = conn.prepareStatement(query);
+ ResultSet rs = stmt.executeQuery();
ret.ensureCapacity(size);
if (rs != null) {
@@ -139,20 +160,12 @@ public class WsmgPersistantStorage imple
} catch (SQLException ex) {
logger.error("sql exception occured", ex);
} finally {
- if (rs != null) {
- try {
- db.close();
- rs.close();
- } catch (SQLException ex) {
- logger.error("sql exception occured", ex);
- }
- }
+ quietlyClose(stmt, conn);
}
return ret;
}
public int insert(SubscriptionState subscription) {
- PreparedStatement stmt = null;
String address = subscription.getConsumerReference().getAddress();
Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
@@ -189,6 +202,7 @@ public class WsmgPersistantStorage imple
int result = 0;
Connection connection = null;
+ PreparedStatement stmt = null;
try {
connection = db.connect();
@@ -206,12 +220,10 @@ public class WsmgPersistantStorage imple
stmt.setTimestamp(8, now);
result = db.executeUpdateAndClose(stmt);
storeToDBCounter.addCounter();
+ db.commitAndFree(connection);
} catch (SQLException ex) {
logger.error("sql exception occured", ex);
- } finally {
- if (connection != null) {
- db.closeConnection(connection);
- }
+ db.rollbackAndFree(connection);
}
return result;
}
@@ -239,8 +251,8 @@ public class WsmgPersistantStorage imple
public Object blockingDequeue() {
while (true) {
try {
- //FIXME::: WHY RETURN KeyValueWrapper Object??????
- //FIXME::: Can it cast to OutGoingMessage????
+ // FIXME::: WHY RETURN KeyValueWrapper Object??????
+ // FIXME::: Can it cast to OutGoingMessage????
KeyValueWrapper wrapper = retrive();
done(wrapper.getKey());
return wrapper.getValue();
@@ -255,10 +267,24 @@ public class WsmgPersistantStorage imple
}
public void cleanup() {
+ Connection conn = null;
+ Statement stmt = null;
try {
- cleanDB();
+ conn = db.connect();
+ stmt = conn.createStatement();
+ batchCleanDB(stmt, conn);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
+ } finally {
+ if (db.isAutoCommit()) {
+ try {
+ conn.setAutoCommit(true);
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ quietlyClose(stmt, conn);
}
}
@@ -271,35 +297,45 @@ public class WsmgPersistantStorage imple
int nextkey;
connection = db.connect();
+
lockMaxMinTables(connection);
+
stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+
ResultSet result = stmt.executeQuery();
if (result.next()) {
nextkey = result.getInt(1);
result.close();
stmt.close();
+
stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
- stmt.executeUpdate();
- stmt.close();
+ db.executeUpdateAndClose(stmt);
} else {
throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
}
+ /*
+ * After update MAX_ID put data into queue table
+ */
stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
stmt.setInt(1, nextkey);
stmt.setString(2, trackId);
- byte[] buffer;
+
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(output);
out.writeObject(object);
- buffer = output.toByteArray();
+ byte[] buffer = output.toByteArray();
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
stmt.setBinaryStream(3, in, buffer.length);
db.executeUpdateAndClose(stmt);
+ db.commit(connection);
+
} catch (SQLException sqlEx) {
+ db.rollback(connection);
logger.error("unable to enque the message in persistant storage", sqlEx);
} catch (IOException ioEx) {
+ db.rollback(connection);
logger.error("unable to enque the message in persistant storage", ioEx);
} finally {
try {
@@ -308,17 +344,7 @@ public class WsmgPersistantStorage imple
logger.error("Cannot Unlock Table", sql);
}
- if (connection != null) {
- db.closeConnection(connection);
- }
-
- try {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- } catch (SQLException sql) {
- logger.error(sql.getMessage(), sql);
- }
+ quietlyClose(stmt, connection);
}
}
@@ -335,8 +361,8 @@ public class WsmgPersistantStorage imple
PreparedStatement stmt = null;
try {
connection = db.connect();
+
lockMaxMinTables(connection);
- logger.debug("locked tables (maxId and minId)4");
/*
* Get Max ID
@@ -346,8 +372,7 @@ public class WsmgPersistantStorage imple
if (!result.next()) {
stmt.close();
stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
- stmt.executeUpdate();
- stmt.close();
+ db.executeUpdateAndClose(stmt);
}
/*
@@ -358,27 +383,25 @@ public class WsmgPersistantStorage imple
if (!result.next()) {
stmt.close();
stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
- stmt.executeUpdate();
- stmt.close();
+ db.executeUpdateAndClose(stmt);
}
-
- logger.debug("unlocked tables (maxId and minId)4");
+ db.commit(connection);
+ } catch (SQLException sqle) {
+ db.rollback(connection);
+ throw sqle;
} finally {
- if (connection != null) {
- db.closeConnection(connection);
+ try {
+ unLockTables(connection);
+ } catch (SQLException sql) {
+ logger.error("Cannot Unlock Table", sql);
}
- unLockTables(connection);
-
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
+ quietlyClose(stmt, connection);
}
}
private KeyValueWrapper retrive() throws SQLException, IOException {
Object obj = null;
- boolean loop = true;
int nextkey = -1;
int maxid = -2;
Connection connection = null;
@@ -386,12 +409,12 @@ public class WsmgPersistantStorage imple
ResultSet result = null;
long wait = 1000;
- while (loop) {
- connection = db.connect();
-
+ while (true) {
try {
+ connection = db.connect();
lockMaxMinTables(connection);
+
/*
* Get Min ID
*/
@@ -421,41 +444,36 @@ public class WsmgPersistantStorage imple
*/
if (maxid > nextkey) {
stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
- stmt.executeUpdate();
- stmt.close();
+ db.executeUpdateAndClose(stmt);
logger.debug("Update MIN ID by one");
-
- unLockTables(connection);
- logger.debug("unlocked tables (maxId and minId)1");
+ db.commit(connection);
break;
}
-
+ db.commit(connection);
+ } catch (SQLException sql) {
+ db.rollback(connection);
+ throw sql;
+ } finally {
try {
unLockTables(connection);
- logger.debug("unlocked tables (maxId and minId)1");
-
- wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
- logger.debug("Wait=" + wait);
- Thread.sleep(wait);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- break;
- }
- } finally {
- /*
- * Make sure connection is always closed
- */
- if (connection != null) {
- db.closeConnection(connection);
+ } catch (SQLException sql) {
+ sql.printStackTrace();
+ logger.error("Cannot Unlock Table", sql);
}
- if (stmt != null && !stmt.isClosed()) {
- try {
- stmt.close();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
+ quietlyClose(stmt, connection);
+ }
+
+ /*
+ * Sleep if there is nothing to do
+ */
+ try {
+ wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
+ logger.debug("Wait=" + wait);
+ Thread.sleep(wait);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ break;
}
}
@@ -481,18 +499,7 @@ public class WsmgPersistantStorage imple
"MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
}
} finally {
- if (connection != null) {
- db.closeConnection(connection);
- }
-
- if (stmt != null) {
- try {
- stmt.close();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
-
+ quietlyClose(stmt, connection);
}
}
@@ -510,25 +517,23 @@ public class WsmgPersistantStorage imple
query = QueueContants.SQL_DELETE_STATEMENT + key;
PreparedStatement stmt = connection.prepareStatement(query);
db.executeUpdateAndClose(stmt);
- } finally {
- if (connection != null) {
- db.closeConnection(connection);
- }
+ db.commitAndFree(connection);
+ } catch (SQLException sqle) {
+ db.rollbackAndFree(connection);
+ throw sqle;
}
}
- public void cleanDB() throws SQLException {
+ private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
DatabaseType databaseType = DatabaseType.other;
- Connection con = null;
- Statement stmt = null;
int[] aiupdateCounts = new int[0];
boolean bError = false;
try {
- con = db.connect();
- stmt = con.createStatement();
- stmt.clearBatch();
con.setAutoCommit(false);
+
+ stmt.clearBatch();
+
int totalStatement = 0;
try {
@@ -568,10 +573,9 @@ public class WsmgPersistantStorage imple
logger.error("Message: " + bue.getMessage());
logger.error("Vendor: " + bue.getErrorCode());
logger.info("Update counts: ");
- int[] updateCounts = bue.getUpdateCounts();
- for (int i = 0; i < updateCounts.length; i++) {
- logger.error(updateCounts[i] + " ");
+ for (int i = 0; i < aiupdateCounts.length; i++) {
+ logger.error(aiupdateCounts[i] + " ");
}
SQLException SQLe = bue;
@@ -579,13 +583,12 @@ public class WsmgPersistantStorage imple
SQLe = SQLe.getNextException();
logger.error(SQLe.getMessage(), SQLe);
}
- } // end BatchUpdateException catch
- catch (SQLException SQLe) {
- logger.error(SQLe.getMessage(), SQLe);
- } // end SQLException catch
- finally {
+ } catch (SQLException SQLe) {
+ bError = true;
+ throw SQLe;
+ } finally {
// determine operation result
- for (int i = 0; i < aiupdateCounts.length; i++) {
+ for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
int iProcessed = aiupdateCounts[i];
/**
* The int values that can be returned in the update counts
@@ -604,7 +607,6 @@ public class WsmgPersistantStorage imple
// error on statement
logger.info("Error batch." + iProcessed);
bError = true;
- break;
}
}
@@ -615,28 +617,14 @@ public class WsmgPersistantStorage imple
}
/*
- * Close previous execution statement if error occurs
- */
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
-
- /*
* Unlock table after rollback and commit, since it is not automatic
- * in mysql
+ * in MySql
*/
if (DatabaseType.mysql.equals(databaseType)) {
PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
db.executeUpdateAndClose(prepareStmt);
}
-
- /*
- * Release connection
- */
- db.closeConnection(con);
-
- con.setAutoCommit(true);
} // end finally
logger.info("Queue is cleaned.");
}
@@ -650,27 +638,26 @@ public class WsmgPersistantStorage imple
}
/*
- * Must turn off autocommit
+ * Must turn off auto commit
*/
connection.setAutoCommit(false);
String sql = null;
- PreparedStatement stmt = null;
+ Statement stmt = null;
try {
switch (databaseType) {
case derby:
sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
- stmt = connection.prepareStatement(sql);
- stmt.execute();
- stmt.close();
- stmt = connection.prepareStatement(sql2);
- stmt.execute();
+ stmt = connection.createStatement();
+ stmt.addBatch(sql);
+ stmt.addBatch(sql2);
+ stmt.executeBatch();
break;
case mysql:
sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
+ " write";
- stmt = connection.prepareStatement(sql);
- stmt.executeQuery();
+ stmt = connection.createStatement();
+ stmt.executeQuery(sql);
break;
default:
return;
@@ -684,7 +671,6 @@ public class WsmgPersistantStorage imple
}
private void unLockTables(Connection connection) throws SQLException {
- String sql = "";
DatabaseType databaseType = DatabaseType.other;
try {
databaseType = DatabaseCreator.getDatabaseType(connection);
@@ -695,14 +681,19 @@ public class WsmgPersistantStorage imple
try {
switch (databaseType) {
case derby:
- connection.commit();
+ /*
+ * Derby doesn't have explicit unlock SQL It uses commit or
+ * rollback as a unlock mechanism, so make sure that connection
+ * is always commited or rollbacked
+ */
break;
case mysql:
- sql = "unlock tables";
+ String sql = "unlock tables";
PreparedStatement stmt = null;
try {
stmt = connection.prepareStatement(sql);
stmt.executeQuery();
+ db.commit(connection);
} finally {
if (stmt != null) {
stmt.close();
@@ -713,7 +704,12 @@ public class WsmgPersistantStorage imple
return;
}
} finally {
- connection.setAutoCommit(true);
+ /*
+ * Set auto commit when needed
+ */
+ if (db.isAutoCommit()) {
+ connection.setAutoCommit(true);
+ }
}
}