You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ma...@apache.org on 2010/06/15 12:58:52 UTC

svn commit: r954809 - /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Author: marnie
Date: Tue Jun 15 10:58:52 2010
New Revision: 954809

URL: http://svn.apache.org/viewvc?rev=954809&view=rev
Log:
QPID-2664 Fixes for database resource clean up

Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=954809&r1=954808&r2=954809&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Tue Jun 15 10:58:52 2010
@@ -358,36 +358,53 @@ public class DerbyMessageStore extends A
 
     private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
     {
-        Connection conn = newConnection();
-
-
-        Statement stmt = conn.createStatement();
-        ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
+        Connection conn = null;
+        Statement stmt = null;
         Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
-        while(rs.next())
+
+        try
         {
-            String queueName = rs.getString(1);
-            String owner = rs.getString(2);
-            AMQShortString queueNameShortString = new AMQShortString(queueName);
+            conn = newConnection();
+            stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
 
-            AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+            while(rs.next())
+            {
+                String queueName = rs.getString(1);
+                String owner = rs.getString(2);
+                AMQShortString queueNameShortString = new AMQShortString(queueName);
 
-            if (q == null)
+                AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+                if (q == null)
+                {
+                    q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
+                                                           null);
+                    _virtualHost.getQueueRegistry().registerQueue(q);
+                }
+
+                queueMap.put(queueNameShortString,q);
+
+                CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
+
+                //Record that we have a queue for recovery
+                _queueRecoveries.put(new AMQShortString(queueName), 0);
+            }
+        }
+        finally
+        {
+            if (stmt != null)
             {
-                q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
-                                                       null);
-                _virtualHost.getQueueRegistry().registerQueue(q);
+                stmt.close();
             }
-            
-            queueMap.put(queueNameShortString,q);
-            
-            CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
-
-            //Record that we have a queue for recovery
-            _queueRecoveries.put(new AMQShortString(queueName), 0);
 
+            if (conn != null)
+            {
+                conn.close();
+            }        
         }
-        return queueMap;
+
+       return queueMap;
     }
 
     private void recoverExchanges() throws AMQException, SQLException
@@ -404,12 +421,13 @@ public class DerbyMessageStore extends A
 
         List<Exchange> exchanges = new ArrayList<Exchange>();
         Connection conn = null;
+        Statement stmt = null;
         try
         {
             conn = newConnection();
 
 
-            Statement stmt = conn.createStatement();
+            stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
 
             Exchange exchange;
@@ -434,6 +452,11 @@ public class DerbyMessageStore extends A
         }
         finally
         {
+            if (stmt != null)
+            {
+                stmt.close();
+            }
+
             if(conn != null)
             {
                 conn.close();
@@ -449,11 +472,12 @@ public class DerbyMessageStore extends A
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
         Connection conn = null;
+        PreparedStatement stmt = null;
         try
         {
             conn = newConnection();
 
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
+            stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
             stmt.setString(1, exchange.getName().toString());
 
             ResultSet rs = stmt.executeQuery();
@@ -493,6 +517,11 @@ public class DerbyMessageStore extends A
         }
         finally
         {
+            if (stmt != null)
+            {
+                stmt.close();    
+            }
+
             if(conn != null)
             {
                 conn.close();
@@ -509,8 +538,7 @@ public class DerbyMessageStore extends A
 
     public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
     {
-
-        boolean localTx = getOrCreateTransaction(storeContext);
+         boolean localTx = getOrCreateTransaction(storeContext);
 
         Connection conn = getConnection(storeContext);
         ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
@@ -523,9 +551,10 @@ public class DerbyMessageStore extends A
 
         // first we need to look up the header to get the chunk count
         MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+        PreparedStatement stmt = null;
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+            stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             wrapper.setRequiresCommit();
             int results = stmt.executeUpdate();
@@ -574,6 +603,25 @@ public class DerbyMessageStore extends A
 
             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
+
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch(SQLException e)
+            {
+                throw new AMQException("Error closing database resources after removeMessage:" + e);
+            }
+        }
 
     }
 
@@ -584,12 +632,13 @@ public class DerbyMessageStore extends A
             try
             {
                 Connection conn = null;
+                PreparedStatement stmt = null;
 
                 try
                 {
                     conn = newConnection();
 
-                    PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
+                    stmt = conn.prepareStatement(FIND_EXCHANGE);
                     stmt.setString(1, exchange.getName().toString());
 
                     ResultSet rs = stmt.executeQuery();
@@ -609,6 +658,11 @@ public class DerbyMessageStore extends A
                 }
                 finally
                 {
+                    if (stmt != null)
+                    {
+                        stmt.close();
+                    }
+
                     if(conn != null)
                     {
                         conn.close();
@@ -657,7 +711,7 @@ public class DerbyMessageStore extends A
                }
                catch (SQLException e)
                {
-                   _logger.error(e);
+                    throw new AMQException("Error closing database resources:" + e);
                }
             }
 
@@ -670,13 +724,13 @@ public class DerbyMessageStore extends A
         if (_state != State.RECOVERING)
         {
             Connection conn = null;
-
+            PreparedStatement stmt = null;
 
             try
             {
                 conn = newConnection();
 
-                PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
+                stmt = conn.prepareStatement(FIND_BINDING);
                 stmt.setString(1, exchange.getName().toString() );
                 stmt.setString(2, queue.getName().toString());
                 stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -718,36 +772,38 @@ public class DerbyMessageStore extends A
             }
             finally
             {
-                if(conn != null)
+                try
                 {
-                   try
-                   {
-                       conn.close();
-                   }
-                   catch (SQLException e)
-                   {
-                       _logger.error(e);
-                   }
-                }
+                    if (stmt != null)
+                    {
+                        stmt.close();
+                    }
 
-            }
+                    if(conn != null)
+                    {
+                        conn.close();
+                    }
 
+                }
+                catch (SQLException e)
+                {
+                    throw new AMQException("Error closing database resources:" + e);
+                }
+            }
         }
-
-
     }
 
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
             throws AMQException
     {
         Connection conn = null;
-
+        PreparedStatement stmt = null;
 
         try
         {
             conn = newConnection();
             // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
-            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
+            stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
             stmt.setString(1, exchange.getName().toString() );
             stmt.setString(2, queue.getName().toString());
             stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -768,21 +824,23 @@ public class DerbyMessageStore extends A
         }
         finally
         {
-            if(conn != null)
+            try
             {
-               try
-               {
-                   conn.close();
-               }
-               catch (SQLException e)
-               {
-                   _logger.error(e);
-               }
-            }
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
 
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
         }
-
-
     }
 
     public void createQueue(AMQQueue queue) throws AMQException
@@ -794,13 +852,16 @@ public class DerbyMessageStore extends A
     {
         _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
 
+        Connection conn = null;
+        PreparedStatement stmt = null;
+
         if (_state != State.RECOVERING)
         {
             try
             {
-                Connection conn = newConnection();
+                conn = newConnection();
 
-                PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+                stmt = conn.prepareStatement(FIND_QUEUE);
                 stmt.setString(1, queue.getName().toString());
 
                 ResultSet rs = stmt.executeQuery();
@@ -826,6 +887,25 @@ public class DerbyMessageStore extends A
             {
                 throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
             }
+            finally
+            {
+                try
+                {
+                    if (stmt != null)
+                    {
+                        stmt.close();
+                    }
+
+                    if (conn != null)
+                    {
+                        conn.close();
+                    }
+                }
+                catch (SQLException e)
+                {
+                   throw new AMQException("Error closing database resources:" + e);
+                }
+            }
         }
     }
 
@@ -888,10 +968,11 @@ public class DerbyMessageStore extends A
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+        PreparedStatement stmt = null;
 
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+            stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
             stmt.setString(1,name.toString());
             stmt.setLong(2,messageId);
             stmt.executeUpdate();
@@ -919,6 +1000,26 @@ public class DerbyMessageStore extends A
             throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
                 + " to database", e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
+
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
+
 
     }
 
@@ -929,10 +1030,11 @@ public class DerbyMessageStore extends A
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+        PreparedStatement stmt = null;
 
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+            stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
             stmt.setString(1,name.toString());
             stmt.setLong(2,messageId);
             int results = stmt.executeUpdate();
@@ -949,8 +1051,6 @@ public class DerbyMessageStore extends A
                 commitTran(context);
             }
 
-
-
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
@@ -966,6 +1066,25 @@ public class DerbyMessageStore extends A
             throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
                 + " from database", e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
+
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
 
     }
 
@@ -1105,10 +1224,10 @@ public class DerbyMessageStore extends A
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
+        PreparedStatement stmt = null;
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+            stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
             stmt.setInt(2, index);
             byte[] chunkData = new byte[contentBody.getSize()];
@@ -1137,6 +1256,25 @@ public class DerbyMessageStore extends A
 
             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
+
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
 
     }
 
@@ -1147,11 +1285,11 @@ public class DerbyMessageStore extends A
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
+        PreparedStatement stmt = null;
         try
         {
 
-            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
+            stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
             stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
@@ -1190,7 +1328,25 @@ public class DerbyMessageStore extends A
 
             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
 
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
 
     }
 
@@ -1198,12 +1354,12 @@ public class DerbyMessageStore extends A
     {
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
-
+        PreparedStatement stmt = null;
 
         try
         {
 
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+            stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             ResultSet rs = stmt.executeQuery();
 
@@ -1275,7 +1431,25 @@ public class DerbyMessageStore extends A
 
             throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
         }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
 
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
 
     }
 
@@ -1283,72 +1457,88 @@ public class DerbyMessageStore extends A
     {
         boolean localTx = getOrCreateTransaction(context);
                 Connection conn = getConnection(context);
+        PreparedStatement stmt = null;
 
+        try
+        {
 
-                try
-                {
-
-                    PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
-                    stmt.setLong(1,messageId);
-                    stmt.setInt(2, index);
-                    ResultSet rs = stmt.executeQuery();
-
-                    if(rs.next())
-                    {
-                        Blob dataAsBlob = rs.getBlob(1);
-
-                        final int size = (int) dataAsBlob.length();
-                        byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
-                        final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-
-                        ContentChunk cb = new ContentChunk()
-                        {
-
-                            public int getSize()
-                            {
-                                return size;
-                            }
-
-                            public ByteBuffer getData()
-                            {
-                                return buf;
-                            }
-
-                            public void reduceToFit()
-                            {
+            stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+            stmt.setLong(1,messageId);
+            stmt.setInt(2, index);
+            ResultSet rs = stmt.executeQuery();
 
-                            }
-                        };
+            if(rs.next())
+            {
+                Blob dataAsBlob = rs.getBlob(1);
 
-                        if(localTx)
-                        {
-                            commitTran(context);
-                        }
+                final int size = (int) dataAsBlob.length();
+                byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+                final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
 
-                        return cb;
+                ContentChunk cb = new ContentChunk()
+                {
 
+                    public int getSize()
+                    {
+                        return size;
                     }
-                    else
+
+                    public ByteBuffer getData()
                     {
-                        if(localTx)
-                        {
-                            abortTran(context);
-                        }
-                        throw new AMQException("Message not found for message with id " + messageId);
+                        return buf;
                     }
-                }
-                catch (SQLException e)
-                {
-                    if(localTx)
+
+                    public void reduceToFit()
                     {
-                        abortTran(context);
+
                     }
+                };
 
-                    throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+                if(localTx)
+                {
+                    commitTran(context);
                 }
 
+                return cb;
+
+            }
+            else
+            {
+                if(localTx)
+                {
+                    abortTran(context);
+                }
+                throw new AMQException("Message not found for message with id " + messageId);
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
 
+            throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+        }
+        finally
+        {
+            try
+            {
+                if (stmt != null)
+                {
+                    stmt.close();
+                }
 
+                if (conn != null)
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+               throw new AMQException("Error closing database resources:" + e);
+            }
+        }
     }
 
     public boolean isPersistent()
@@ -1395,6 +1585,7 @@ public class DerbyMessageStore extends A
 
         final boolean inLocaltran = inTran(context);
         Connection conn = null;
+        PreparedStatement stmt = null;
         try
         {
             if(inLocaltran)
@@ -1411,8 +1602,8 @@ public class DerbyMessageStore extends A
 
             TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
 
-            Statement stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+            stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY);
+            ResultSet rs = stmt.executeQuery();
 
             while (rs.next())
             {
@@ -1487,6 +1678,11 @@ public class DerbyMessageStore extends A
         }
         finally
         {
+            if (stmt != null)
+            {
+                stmt.close();
+            }
+
             if (inLocaltran && conn != null)
             {
                 conn.close();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org