You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/17 04:32:24 UTC
svn commit: r1171884 [2/2] - in
/incubator/airavata/trunk/modules/ws-messenger:
commons/src/main/java/org/apache/airavata/wsmg/commons/config/
commons/src/main/java/org/apache/airavata/wsmg/commons/storage/
messagebox/src/main/org/apache/airavata/wsmg/...
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Sat Sep 17 02:32:24 2011
@@ -33,59 +33,48 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.StringTokenizer;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.util.Counter;
import org.apache.airavata.wsmg.util.TimerThread;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WsmgPersistantStorage implements WsmgStorage {
- org.apache.log4j.Logger logger = Logger.getLogger(WsmgPersistantStorage.class);
-
- JdbcStorage db = null;
-
- Connection conn = null;
+ private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
private Counter storeToDBCounter = new Counter();
- /*
- * this thing is never used in this context
- */
- // private final static XmlInfosetBuilder builder = XmlConstants.BUILDER;
+ private JdbcStorage db = null;
+
String dbName = null;
// private ConnectionPool connectionPool;
public WsmgPersistantStorage(String ordinarySubsTblName, String specialSubsTblName, ConfigurationManager config)
throws AxisFault {
- /*
- * try { conn = db.connect(); } catch (SQLException ex) { ex.printStackTrace(); }
- */
+
this.dbName = ordinarySubsTblName;
db = new JdbcStorage(config.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL),
config.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER));
- // Lets connect to the database and create tables if they are not
- // already there.
-
-
// inject dbname to sql statement.
-
SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
SubscriptionConstants.INSERT_SQL_QUERY, ordinarySubsTblName);
@@ -99,62 +88,50 @@ public class WsmgPersistantStorage imple
try {
initMessageQueueStorage();
-
} catch (SQLException sqlEx) {
throw AxisFault.makeFault(sqlEx);
}
-
}
/*
* (non-Javadoc)
*
- * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#getAllSubscription()
+ * @see
+ * org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
*/
public List<SubscriptionEntry> getAllSubscription() {
ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
- int totalRecord = 0;
String query = "SELECT * FROM " + dbName;
ResultSet rs = null;
try {
- rs = db.query(query);
- if (rs != null) {
- // Get total number of rows
- // Point to the last row in resultset.
- rs.last();
- // Get the row position which is also the number of rows in the
- // ResultSet.
- totalRecord = rs.getRow();
- logger.debug("TotalRecord=" + totalRecord);
- // Create String array to return
-
- ret.ensureCapacity(totalRecord);
-
- // Reposition at the beginning of the ResultSet to take up
- // rs.next() call.
-// rs.beforeFirst();
+ // get number of row first and increase the arrayList size for
+ // better performance
+ int size = db.countRow(dbName, "*");
+
+ rs = db.query(query);
+ ret.ensureCapacity(size);
+ if (rs != null) {
while (rs.next()) {
SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
subscriptionEntry.setSubscribeXml(rs.getString("content"));
-
- /*
- * int policyValue = rs.getInt("wsrm"); subscriptionEntry[i] .setWsrmPolicy(policyValue ==
- * WsmgCommonConstants.WSRM_POLICY_TRUE);
- */
-
ret.add(subscriptionEntry);
-
}
- rs.close();
}
} catch (SQLException ex) {
- logger.fatal("sql exception occured", ex);
- ret = new ArrayList<SubscriptionEntry>();
+ logger.error("sql exception occured", ex);
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException ex) {
+ logger.error("sql exception occured", ex);
+ }
+ }
}
return ret;
}
@@ -181,23 +158,21 @@ public class WsmgPersistantStorage imple
}
}
-
consumerReferenceParameters = buffer.toString();
-
}
+
int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
if (subscription.isWsrmPolicy()) {
policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
}
- java.util.Date today = new java.util.Date();
- java.sql.Timestamp now = new java.sql.Timestamp(today.getTime());
+ Date today = new Date();
+ Timestamp now = new Timestamp(today.getTime());
String sql = subscription.isNeverExpire() ? SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY
: SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY;
int result = 0;
-
Connection connection = null;
try {
@@ -205,8 +180,8 @@ public class WsmgPersistantStorage imple
stmt = connection.prepareStatement(sql);
stmt.setString(1, subscription.getId());
- stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()),
- subscription.getSubscribeXml().getBytes().length);
+ stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
+ .getSubscribeXml().getBytes().length);
stmt.setInt(3, policyValue);
stmt.setString(4, subscription.getLocalTopic());
stmt.setString(5, subscription.getXpathString());
@@ -214,137 +189,75 @@ public class WsmgPersistantStorage imple
stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
consumerReferenceParameters.getBytes().length);
stmt.setTimestamp(8, now);
- result = db.executeUpdate(stmt);
+ result = db.executeUpdateAndClose(stmt);
storeToDBCounter.addCounter();
} catch (SQLException ex) {
- logger.fatal("sql exception occured", ex);
+ logger.error("sql exception occured", ex);
} finally {
-
if (connection != null) {
- try {
- db.closeConnection(connection);
- } catch (SQLException e) {
-
- e.printStackTrace();
- }
+ db.closeConnection(connection);
}
-
}
-
return result;
-
}
+
/*
* (non-Javadoc)
*
- * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java.lang.String)
+ * @see
+ * org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java
+ * .lang.String)
*/
public int delete(String subscriptionId) {
String query;
query = "DELETE FROM " + dbName + " WHERE SubscriptionId='" + subscriptionId + "'";
try {
- db.update(query);
+ return db.update(query);
} catch (SQLException ex) {
- logger.fatal("sql exception occured", ex);
- // TODO : throw this exception
+ logger.error("sql exception occured", ex);
}
return 0;
}
- /**
- * If data base tables are defined as SQL queries in file placed at xregistry/tables.sql in the classpath, those SQL
- * queries are execuated against the data base. On the file, any line start # is igonred as a comment.
- *
- * @throws XregistryException
- */
- private void initDatabaseTables(JdbcStorage jdbcStorage) throws AxisFault {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- InputStream sqltablesAsStream = cl.getResourceAsStream("broker-tables.sql");
-
- if (sqltablesAsStream == null) {
- return;
- }
-
- Connection connection = jdbcStorage.connect();
- try {
- Statement statement = connection.createStatement();
-
- String docAsStr = CommonRoutines.readFromStream(sqltablesAsStream);
- StringTokenizer t = new StringTokenizer(docAsStr, ";");
-
- while (t.hasMoreTokens()) {
- String line = t.nextToken();
- if (line.trim().length() > 0 && !line.startsWith("#")) {
- System.out.println(line.trim());
- statement.executeUpdate(line.trim());
- }
- }
- } catch (SQLException e) {
- throw AxisFault.makeFault(e);
- } catch (IOException e) {
- throw AxisFault.makeFault(e);
- } finally {
- try {
- jdbcStorage.closeConnection(connection);
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
-
// QUEUE related
public Object blockingDequeue() {
-
- boolean gotValue = false;
-
- while (!gotValue) {
+ while (true) {
try {
KeyValueWrapper wrapper = retrive();
-
done(wrapper.getKey());
return wrapper.getValue();
} catch (SQLException e) {
- // TODO Auto-generated catch block
+ logger.error(e.getMessage(), e);
e.printStackTrace();
} catch (IOException e) {
- // TODO Auto-generated catch block
+ logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
- return null;
-
}
public void cleanup() {
-
try {
cleanDB();
- initMessageQueueStorage();
} catch (SQLException e) {
- logger.error(e);
+ logger.error(e.getMessage(), e);
}
-
}
public void enqueue(Object object, String trackId) {
// Get the Max ID cache and update and unlock the table
Connection connection = null;
+ PreparedStatement stmt = null;
try {
- connection = db.connect();
-
- PreparedStatement stmt = null;
- lockTables(connection,stmt);
- // System.out.println("locked maxId table");
+ int nextkey;
+ connection = db.connect();
+ lockMaxMinTables(connection);
stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
ResultSet result = stmt.executeQuery();
- int nextkey;
-
if (result.next()) {
nextkey = result.getInt(1);
result.close();
@@ -353,17 +266,10 @@ public class WsmgPersistantStorage imple
stmt.executeUpdate();
stmt.close();
} else {
- nextkey = 1;
- result.close();
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
- stmt.executeUpdate();
- stmt.close();
+ throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
}
- unLockTables(connection,stmt);
stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
-
stmt.setInt(1, nextkey);
stmt.setString(2, trackId);
byte[] buffer;
@@ -373,22 +279,29 @@ public class WsmgPersistantStorage imple
buffer = output.toByteArray();
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
stmt.setBinaryStream(3, in, buffer.length);
- db.executeUpdate(stmt);
+ db.executeUpdateAndClose(stmt);
} catch (SQLException sqlEx) {
logger.error("unable to enque the message in persistant storage", sqlEx);
} catch (IOException ioEx) {
logger.error("unable to enque the message in persistant storage", ioEx);
-
} finally {
+ try {
+ unLockTables(connection);
+ } catch (SQLException sql) {
+ logger.error("Cannot Unlock Table", sql);
+ }
if (connection != null) {
- try {
- db.closeConnection(connection);
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ db.closeConnection(connection);
}
+ try {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ } catch (SQLException sql) {
+ logger.error(sql.getMessage(), sql);
+ }
}
}
@@ -401,201 +314,236 @@ public class WsmgPersistantStorage imple
}
private void initMessageQueueStorage() throws SQLException {
- Connection connection = db.connect();
- String sql = "";
- String databaseType = "";
+ Connection connection = null;
PreparedStatement stmt = null;
- lockTables(connection,stmt);
- // System.out.println("locked tables (maxId and minId)4");
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
- ResultSet result = stmt.executeQuery();
- if (!result.next()) {
- result.close();
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
- stmt.executeUpdate();
- stmt.close();
- }
-
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
-
- if (!result.next()) {
- result.close();
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
- stmt.executeUpdate();
- stmt.close();
- }
- unLockTables(connection,stmt);
- db.closeConnection(connection);
- // System.out.println("unlocked tables (maxId and minId)4");
+ try {
+ connection = db.connect();
+ lockMaxMinTables(connection);
+ logger.debug("locked tables (maxId and minId)4");
+
+ /*
+ * Get Max ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+ ResultSet result = stmt.executeQuery();
+ if (!result.next()) {
+ stmt.close();
+ stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+ stmt.executeUpdate();
+ stmt.close();
+ }
+
+ /*
+ * Get Min ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+ result = stmt.executeQuery();
+ if (!result.next()) {
+ stmt.close();
+ stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+ stmt.executeUpdate();
+ stmt.close();
+ }
+
+ logger.debug("unlocked tables (maxId and minId)4");
+ } finally {
+ if (connection != null) {
+ db.closeConnection(connection);
+ }
+
+ unLockTables(connection);
+
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ }
}
- public KeyValueWrapper retrive() throws SQLException, IOException {
+ private KeyValueWrapper retrive() throws SQLException, IOException {
Object obj = null;
-
- // Get the smallest id
- Connection connection = db.connect();
boolean loop = true;
-
int nextkey = -1;
int maxid = -2;
+ Connection connection = null;
PreparedStatement stmt = null;
ResultSet result = null;
- boolean connectionClosed = false;
long wait = 1000;
- while (loop) {
- lockTables(connection,stmt);
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
-
- if (result.next()) {
- nextkey = result.getInt(1);
- result.close();
- stmt.close();
- } else {
- throw new RuntimeException("Queue init has failed earlier");
- }
+ while (loop) {
+ connection = db.connect();
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
+ try {
- if (result.next()) {
- maxid = result.getInt(1);
- result.close();
- stmt.close();
+ lockMaxMinTables(connection);
+ /*
+ * Get Min ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+ result = stmt.executeQuery();
+ if (result.next()) {
+ nextkey = result.getInt(1);
+ stmt.close();
+ } else {
+ throw new RuntimeException("Queue init has failed earlier");
+ }
- } else {
- throw new RuntimeException("Queue init has failed earlier");
- }
+ /*
+ * Get Max ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+ result = stmt.executeQuery();
+ if (result.next()) {
+ maxid = result.getInt(1);
+ stmt.close();
+ } else {
+ throw new RuntimeException("Queue init has failed earlier");
+ }
+ /*
+ * Update value and exit the loop
+ */
+ if (maxid > nextkey) {
+ stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+ stmt.executeUpdate();
+ stmt.close();
+ logger.debug("Update MIN ID by one");
- unLockTables(connection,stmt);
- // System.out.println("unlocked tables (maxId and minId)1");
- if (maxid > nextkey) {
- loop = false;
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
- stmt.executeUpdate();
- stmt.close();
- unLockTables(connection,stmt);
- // System.out.println("unlocked tables (maxId and minId) 2");
+ unLockTables(connection);
+ logger.debug("unlocked tables (maxId and minId)1");
+ break;
+ }
- } else {
try {
- unLockTables(connection,stmt);
- db.closeConnection(connection);
- connectionClosed = true;
+ unLockTables(connection);
+ logger.debug("unlocked tables (maxId and minId)1");
+
wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
- // System.out.println("Wait="+wait);
+ logger.debug("Wait=" + wait);
Thread.sleep(wait);
} catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ break;
+ }
+ } finally {
+ /*
+ * Make sure connection is always closed
+ */
+ if (connection != null) {
+ db.closeConnection(connection);
+ }
+ if (stmt != null && !stmt.isClosed()) {
+ try {
+ stmt.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
}
}
- if (connectionClosed) {
- connection = db.connect();
- connectionClosed = false;
- }
}
- stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
- result = stmt.executeQuery();
- // FIXME: THis loop caused out-of-memory
- while (!result.next()) {
- // FIXME Remove this while loop and change the
- // order of db update if possible in the save.
- result.close();
- stmt.close();
- // System.out.println("Looping in 1");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
-
- logger.error("interruped while thread sleep", e);
- }
+ /*
+ * Create Subscription Object from MIN_ID
+ */
+ try {
+ connection = db.connect();
stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
result = stmt.executeQuery();
+ if (result.next()) {
+ int id = result.getInt(1);
+ InputStream in = result.getAsciiStream(2);
+ ObjectInputStream s = new ObjectInputStream(in);
+ try {
+ obj = s.readObject();
+ } catch (ClassNotFoundException e) {
+ logger.error(e.getMessage(), e);
+ }
+ return new KeyValueWrapper(id, obj);
+ } else {
+ throw new RuntimeException(
+ "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
+ }
+ } finally {
+ if (connection != null) {
+ db.closeConnection(connection);
+ }
- }
- int id = result.getInt(1);
- InputStream in = result.getAsciiStream(2);
- ObjectInputStream s = new ObjectInputStream(in);
- try {
- obj = s.readObject();
- } catch (ClassNotFoundException e) {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
- e.printStackTrace();
}
- result.close();
- stmt.close();
-
- db.closeConnection(connection);
-
- return new KeyValueWrapper(id, obj);
-
}
- public void done(int key) throws SQLException {
+ /**
+ * Delete data in subscription table since it is read
+ *
+ * @param key
+ * @throws SQLException
+ */
+ private void done(int key) throws SQLException {
String query = null;
- PreparedStatement stmt = null;
- Connection connection = db.connect();
+ Connection connection = null;
try {
+ connection = db.connect();
query = QueueContants.SQL_DELETE_STATEMENT + key;
- // System.out.println("Deleting key="+key);
- stmt = connection.prepareStatement(query);
- db.executeUpdate(stmt);
- stmt.close();
+ PreparedStatement stmt = connection.prepareStatement(query);
+ db.executeUpdateAndClose(stmt);
} finally {
- db.closeConnection(connection);
+ if (connection != null) {
+ db.closeConnection(connection);
+ }
}
-
}
public void cleanDB() throws SQLException {
- Connection con = db.connect();
- Statement stmt = con.createStatement();
+ DatabaseType databaseType = DatabaseType.other;
+ Connection con = null;
+ Statement stmt = null;
int[] aiupdateCounts = new int[0];
boolean bError = false;
try {
+ con = db.connect();
+ stmt = con.createStatement();
+ stmt.clearBatch();
con.setAutoCommit(false);
- // ...
- bError = false;
- stmt.clearBatch();
int totalStatement = 0;
- String databaseType = "";
+
try {
- databaseType = DatabaseCreator.getDatabaseType(conn);
+ databaseType = DatabaseCreator.getDatabaseType(con);
} catch (Exception e) {
- logger.error("Error evaluating database type");
+ logger.error("Error evaluating database type", e);
}
// add SQL statements
- if("mysql".equals(databaseType)){
+ if (DatabaseType.mysql.equals(databaseType)) {
stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
+ totalStatement++;
+ } else if (DatabaseType.derby.equals(databaseType)) {
+ stmt.addBatch("lock table disQ execusive mode;");
+ totalStatement++;
+ stmt.addBatch("lock table MaxIDTable execusive mode;");
+ totalStatement++;
+ stmt.addBatch("lock table MinIDTable execusive mode;");
+ totalStatement++;
}
- // System.out.println("locked tables (maxId and minId) 5");
stmt.addBatch("Delete from disQ;");
+ totalStatement++;
stmt.addBatch("Delete from MaxIDTable;");
+ totalStatement++;
stmt.addBatch("Delete from MinIDTable;");
-
- if ("mysql".equals(databaseType)) {
- stmt.addBatch("unlock tables;");
- }
- // System.out.println("unlocked tables (maxId and minId) 5");
- totalStatement = 5;
+ totalStatement++;
aiupdateCounts = new int[totalStatement];
// execute the statements
aiupdateCounts = stmt.executeBatch();
- } // end try
-
- // catch blocks
- // ...
- catch (BatchUpdateException bue) {
+ } catch (BatchUpdateException bue) {
bError = true;
aiupdateCounts = bue.getUpdateCounts();
logger.error("SQLException: " + bue.getMessage());
@@ -611,53 +559,147 @@ public class WsmgPersistantStorage imple
SQLException SQLe = bue;
while (SQLe != null) {
- // do exception stuff
-
SQLe = SQLe.getNextException();
- logger.error(SQLe);
+ logger.error(SQLe.getMessage(), SQLe);
}
} // end BatchUpdateException catch
catch (SQLException SQLe) {
- // ...
- logger.error(SQLe);
+ logger.error(SQLe.getMessage(), SQLe);
} // end SQLException catch
finally {
- db.closeConnection(con);
// determine operation result
for (int i = 0; i < aiupdateCounts.length; i++) {
int iProcessed = aiupdateCounts[i];
- // The int values that can be returned in the update counts
- // array are:
- // -3--Operation error. A driver has the option to stop at the
- // first error and throw a BatchUpdateException or to report the
- // error and continue. This value is only seen in the latter
- // case.
- // -2--The operation was successful, but the number of rows
- // affected is unknown.
- // Zero--DDL statement or no rows affected by the operation.
- // Greater than zero--Operation was successful, number of rows
- // affected by the operation.
- if (iProcessed > 0 || iProcessed == -2) {
- // statement was successful
- // ...
- } else {
+ /**
+ * The int values that can be returned in the update counts
+ * array are: <br/>
+ * -3--Operation error. A driver has the option to stop at the
+ * first error and throw a BatchUpdateException or to report the
+ * error and continue. This value is only seen in the latter
+ * case. <br/>
+ * -2--The operation was successful, but the number of rows
+ * affected is unknown. <br/>
+ * Zero--DDL statement or no rows affected by the operation.
+ * Greater than zero--Operation was successful, number of rows
+ * affected by the operation.
+ */
+ if (iProcessed < 0 && iProcessed != -2) {
// error on statement
- logger.info("Batch update successful.");
+ logger.info("Error batch." + iProcessed);
bError = true;
break;
}
- } // end for
+ }
if (bError) {
con.rollback();
} else {
con.commit();
}
+
+ /*
+ * Close previous execution statement if error occurs
+ */
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+
+ /*
+ * Unlock table after rollback and commit, since it is not automatic
+ * in mysql
+ */
+
+ if (DatabaseType.mysql.equals(databaseType)) {
+ PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
+ db.executeUpdateAndClose(prepareStmt);
+ }
+
+ /*
+ * Release connection
+ */
+ db.closeConnection(con);
+
con.setAutoCommit(true);
} // end finally
logger.info("Queue is cleaned.");
}
+ private void lockMaxMinTables(Connection connection) throws SQLException {
+ DatabaseType databaseType = DatabaseType.other;
+ try {
+ databaseType = DatabaseCreator.getDatabaseType(connection);
+ } catch (Exception e) {
+ logger.error("Error evaluating database type", e);
+ }
+
+ /*
+ * Must turn off autocommit
+ */
+ connection.setAutoCommit(false);
+ String sql = null;
+ PreparedStatement stmt = null;
+ try {
+ switch (databaseType) {
+ case derby:
+ sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " EXCLUSIVE MODE";
+ String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " EXCLUSIVE MODE";
+ stmt = connection.prepareStatement(sql);
+ stmt.executeQuery();
+ stmt.close();
+ stmt = connection.prepareStatement(sql2);
+ stmt.executeQuery();
+ break;
+ case mysql:
+ sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
+ + " write";
+ stmt = connection.prepareStatement(sql);
+ stmt.executeQuery();
+ break;
+ default:
+ return;
+ }
+
+ } finally {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ }
+ }
+
+ private void unLockTables(Connection connection) throws SQLException {
+ String sql = "";
+ DatabaseType databaseType = DatabaseType.other;
+ try {
+ databaseType = DatabaseCreator.getDatabaseType(connection);
+ } catch (Exception e) {
+ logger.error("Error evaluating database type", e);
+ }
+
+ try {
+ switch (databaseType) {
+ case derby:
+ connection.commit();
+ break;
+ case mysql:
+ sql = "unlock tables";
+ PreparedStatement stmt = null;
+ try {
+ stmt = connection.prepareStatement(sql);
+ stmt.executeQuery();
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ }
+ break;
+ default:
+ return;
+ }
+ } finally {
+ connection.setAutoCommit(true);
+ }
+ }
+
private static class SubscriptionConstants {
public static String INSERT_SQL_QUERY = "INSERT INTO %s(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
@@ -666,7 +708,6 @@ public class WsmgPersistantStorage imple
public static String ORDINARY_SUBSCRIPTION_INSERT_QUERY = null;
public static String SPECIAL_SUBSCRIPTION_INSERT_QUERY = null;
-
}
private static class QueueContants {
@@ -700,46 +741,5 @@ public class WsmgPersistantStorage imple
public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
}
- private void lockTables(Connection connection,PreparedStatement stmt)throws SQLException{
- String sql = "";
- String databaseType = "";
- try {
- databaseType = DatabaseCreator.getDatabaseType(connection);
- } catch (Exception e) {
- logger.error("Error evaluating database type");
- }
-
- if ("derby".equals(databaseType)) {
-// sql = "lock table " + QueueContants.TABLE_NAME_MAXID + " IN SHARE MODE";
-// stmt = connection.prepareStatement(sql);
-// stmt.executeUpdate();
-// sql = "lock table " + QueueContants.TABLE_NAME_MINID + " IN SHARE MODE";
-// connection.prepareStatement(sql);
-// stmt.executeUpdate();
-// stmt.close();
- } else if ("mysql".equals(databaseType)) {
- sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + ","
- + QueueContants.TABLE_NAME_MINID + " write";
- stmt = connection.prepareStatement(sql);
- stmt.executeQuery();
- stmt.close();
- }
- }
- private void unLockTables(Connection connection,PreparedStatement stmt)throws SQLException{
- String sql = "";
- String databaseType = "";
- try {
- databaseType = DatabaseCreator.getDatabaseType(connection);
- } catch (Exception e) {
- logger.error("Error evaluating database type");
- }
-
- if ("mysql".equals(databaseType)) {
- sql = "unlock tables";
- stmt = connection.prepareStatement(sql);
- stmt.executeQuery();
- stmt.close();
- }
- }
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java Sat Sep 17 02:32:24 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.airavata.wsmg.broker.NotificationProcessor;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
@@ -38,15 +39,9 @@ public class WsmgConfigurationContext {
private OutGoingQueue outgoingQueue = null;
- // private AbstractMessageMatcher messageMatcher;
-
private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
- private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();// TODO:
-
- // parameterize
- // the
- // map
+ private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();
private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Sat Sep 17 02:32:24 2011
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.util.RunTimeStatistics;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java Sat Sep 17 02:32:24 2011
@@ -31,7 +31,7 @@ import org.apache.airavata.wsmg.broker.C
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol;
import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Sat Sep 17 02:32:24 2011
@@ -31,7 +31,7 @@ import org.apache.airavata.wsmg.broker.C
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
import org.apache.airavata.wsmg.messenger.SenderUtils;
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Sat Sep 17 02:32:24 2011
@@ -35,7 +35,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
import org.apache.airavata.wsmg.messenger.SenderUtils;
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Sat Sep 17 02:32:24 2011
@@ -30,7 +30,7 @@ import org.apache.airavata.wsmg.broker.A
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
import org.apache.airavata.wsmg.messenger.SenderUtils;
Modified: incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java?rev=1171884&r1=1171883&r2=1171884&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java Sat Sep 17 02:32:24 2011
@@ -21,7 +21,6 @@
package org.apache.airavata.wsmg.messenger;
-import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@@ -33,8 +32,8 @@ import javax.servlet.http.HttpServletRes
import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
import org.apache.airavata.wsmg.commons.WsmgVersion;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
-import org.apache.airavata.wsmg.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;