You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ma...@apache.org on 2010/06/15 12:58:52 UTC
svn commit: r954809 -
/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Author: marnie
Date: Tue Jun 15 10:58:52 2010
New Revision: 954809
URL: http://svn.apache.org/viewvc?rev=954809&view=rev
Log:
QPID-2664 Fixes for database resource clean up
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=954809&r1=954808&r2=954809&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Tue Jun 15 10:58:52 2010
@@ -358,36 +358,53 @@ public class DerbyMessageStore extends A
private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
{
- Connection conn = newConnection();
-
-
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
+ Connection conn = null;
+ Statement stmt = null;
Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
- while(rs.next())
+
+ try
{
- String queueName = rs.getString(1);
- String owner = rs.getString(2);
- AMQShortString queueNameShortString = new AMQShortString(queueName);
+ conn = newConnection();
+ stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+ while(rs.next())
+ {
+ String queueName = rs.getString(1);
+ String owner = rs.getString(2);
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
- if (q == null)
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+ if (q == null)
+ {
+ q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
+ null);
+ _virtualHost.getQueueRegistry().registerQueue(q);
+ }
+
+ queueMap.put(queueNameShortString,q);
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
+
+ //Record that we have a queue for recovery
+ _queueRecoveries.put(new AMQShortString(queueName), 0);
+ }
+ }
+ finally
+ {
+ if (stmt != null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
- null);
- _virtualHost.getQueueRegistry().registerQueue(q);
+ stmt.close();
}
-
- queueMap.put(queueNameShortString,q);
-
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
-
- //Record that we have a queue for recovery
- _queueRecoveries.put(new AMQShortString(queueName), 0);
+ if (conn != null)
+ {
+ conn.close();
+ }
}
- return queueMap;
+
+ return queueMap;
}
private void recoverExchanges() throws AMQException, SQLException
@@ -404,12 +421,13 @@ public class DerbyMessageStore extends A
List<Exchange> exchanges = new ArrayList<Exchange>();
Connection conn = null;
+ Statement stmt = null;
try
{
conn = newConnection();
- Statement stmt = conn.createStatement();
+ stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
Exchange exchange;
@@ -434,6 +452,11 @@ public class DerbyMessageStore extends A
}
finally
{
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
if(conn != null)
{
conn.close();
@@ -449,11 +472,12 @@ public class DerbyMessageStore extends A
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
Connection conn = null;
+ PreparedStatement stmt = null;
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
+ stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString());
ResultSet rs = stmt.executeQuery();
@@ -493,6 +517,11 @@ public class DerbyMessageStore extends A
}
finally
{
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
if(conn != null)
{
conn.close();
@@ -509,8 +538,7 @@ public class DerbyMessageStore extends A
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
-
- boolean localTx = getOrCreateTransaction(storeContext);
+ boolean localTx = getOrCreateTransaction(storeContext);
Connection conn = getConnection(storeContext);
ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
@@ -523,9 +551,10 @@ public class DerbyMessageStore extends A
// first we need to look up the header to get the chunk count
MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
wrapper.setRequiresCommit();
int results = stmt.executeUpdate();
@@ -574,6 +603,25 @@ public class DerbyMessageStore extends A
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch(SQLException e)
+ {
+ throw new AMQException("Error closing database resources after removeMessage:" + e);
+ }
+ }
}
@@ -584,12 +632,13 @@ public class DerbyMessageStore extends A
try
{
Connection conn = null;
+ PreparedStatement stmt = null;
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
+ stmt = conn.prepareStatement(FIND_EXCHANGE);
stmt.setString(1, exchange.getName().toString());
ResultSet rs = stmt.executeQuery();
@@ -609,6 +658,11 @@ public class DerbyMessageStore extends A
}
finally
{
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
if(conn != null)
{
conn.close();
@@ -657,7 +711,7 @@ public class DerbyMessageStore extends A
}
catch (SQLException e)
{
- _logger.error(e);
+ throw new AMQException("Error closing database resources:" + e);
}
}
@@ -670,13 +724,13 @@ public class DerbyMessageStore extends A
if (_state != State.RECOVERING)
{
Connection conn = null;
-
+ PreparedStatement stmt = null;
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
+ stmt = conn.prepareStatement(FIND_BINDING);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -718,36 +772,38 @@ public class DerbyMessageStore extends A
}
finally
{
- if(conn != null)
+ try
{
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
+ if (stmt != null)
+ {
+ stmt.close();
+ }
- }
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
-
-
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQException
{
Connection conn = null;
-
+ PreparedStatement stmt = null;
try
{
conn = newConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
+ stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -768,21 +824,23 @@ public class DerbyMessageStore extends A
}
finally
{
- if(conn != null)
+ try
{
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
}
-
-
}
public void createQueue(AMQQueue queue) throws AMQException
@@ -794,13 +852,16 @@ public class DerbyMessageStore extends A
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
if (_state != State.RECOVERING)
{
try
{
- Connection conn = newConnection();
+ conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+ stmt = conn.prepareStatement(FIND_QUEUE);
stmt.setString(1, queue.getName().toString());
ResultSet rs = stmt.executeQuery();
@@ -826,6 +887,25 @@ public class DerbyMessageStore extends A
{
throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
}
@@ -888,10 +968,11 @@ public class DerbyMessageStore extends A
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
@@ -919,6 +1000,26 @@ public class DerbyMessageStore extends A
throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
+
}
@@ -929,10 +1030,11 @@ public class DerbyMessageStore extends A
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+ stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
@@ -949,8 +1051,6 @@ public class DerbyMessageStore extends A
commitTran(context);
}
-
-
if (_logger.isDebugEnabled())
{
_logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
@@ -966,6 +1066,25 @@ public class DerbyMessageStore extends A
throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ " from database", e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
@@ -1105,10 +1224,10 @@ public class DerbyMessageStore extends A
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, index);
byte[] chunkData = new byte[contentBody.getSize()];
@@ -1137,6 +1256,25 @@ public class DerbyMessageStore extends A
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
@@ -1147,11 +1285,11 @@ public class DerbyMessageStore extends A
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
+ stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
@@ -1190,7 +1328,25 @@ public class DerbyMessageStore extends A
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
@@ -1198,12 +1354,12 @@ public class DerbyMessageStore extends A
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
-
+ PreparedStatement stmt = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
@@ -1275,7 +1431,25 @@ public class DerbyMessageStore extends A
throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
}
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
@@ -1283,72 +1457,88 @@ public class DerbyMessageStore extends A
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
+ PreparedStatement stmt = null;
+ try
+ {
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- stmt.setInt(2, index);
- ResultSet rs = stmt.executeQuery();
-
- if(rs.next())
- {
- Blob dataAsBlob = rs.getBlob(1);
-
- final int size = (int) dataAsBlob.length();
- byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-
- ContentChunk cb = new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return buf;
- }
-
- public void reduceToFit()
- {
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, index);
+ ResultSet rs = stmt.executeQuery();
- }
- };
+ if(rs.next())
+ {
+ Blob dataAsBlob = rs.getBlob(1);
- if(localTx)
- {
- commitTran(context);
- }
+ final int size = (int) dataAsBlob.length();
+ byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+ final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- return cb;
+ ContentChunk cb = new ContentChunk()
+ {
+ public int getSize()
+ {
+ return size;
}
- else
+
+ public ByteBuffer getData()
{
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Message not found for message with id " + messageId);
+ return buf;
}
- }
- catch (SQLException e)
- {
- if(localTx)
+
+ public void reduceToFit()
{
- abortTran(context);
+
}
+ };
- throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ if(localTx)
+ {
+ commitTran(context);
}
+ return cb;
+
+ }
+ else
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ throw new AMQException("Message not found for message with id " + messageId);
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ }
+ finally
+ {
+ try
+ {
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error closing database resources:" + e);
+ }
+ }
}
public boolean isPersistent()
@@ -1395,6 +1585,7 @@ public class DerbyMessageStore extends A
final boolean inLocaltran = inTran(context);
Connection conn = null;
+ PreparedStatement stmt = null;
try
{
if(inLocaltran)
@@ -1411,8 +1602,8 @@ public class DerbyMessageStore extends A
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY);
+ ResultSet rs = stmt.executeQuery();
while (rs.next())
{
@@ -1487,6 +1678,11 @@ public class DerbyMessageStore extends A
}
finally
{
+ if (stmt != null)
+ {
+ stmt.close();
+ }
+
if (inLocaltran && conn != null)
{
conn.close();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org