You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/07 20:03:57 UTC

svn commit: r802113 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java

Author: ritchiem
Date: Fri Aug  7 18:03:56 2009
New Revision: 802113

URL: http://svn.apache.org/viewvc?rev=802113&view=rev
Log:
QPID-2002 : MessageStore Logging updates to include queue counts from persistent stores

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=802113&r1=802112&r2=802113&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Aug  7 18:03:56 2009
@@ -95,6 +95,7 @@
 
     private String _connectionURL;
 
+    Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
 
 
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
@@ -380,6 +381,11 @@
             }
             
             queueMap.put(queueNameShortString,q);
+            
+            CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(String.valueOf(q.getName()), true));
+
+            //Record that we have a queue for recovery
+            _queueRecoveries.put(new AMQShortString(queueName), 0);
 
         }
         return queueMap;
@@ -1378,7 +1384,6 @@
         Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
         List<ProcessAction> actions = new ArrayList<ProcessAction>();
 
-        Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
 
         final boolean inLocaltran = inTran(context);
         Connection conn = null;
@@ -1436,17 +1441,14 @@
                     _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
                 }
 
-                if (_logger.isInfoEnabled())
+                Integer count = _queueRecoveries.get(queueName);
+                if (count == null)
                 {
-                    Integer count = queueRecoveries.get(queueName);
-                    if (count == null)
-                    {
-                        count = 0;
-                    }
-
-                    queueRecoveries.put(queueName, ++count);
+                    count = 0;
                 }
 
+                _queueRecoveries.put(queueName, ++count);
+
                 actions.add(new ProcessAction(queue, context, message));
             }
 
@@ -1472,8 +1474,19 @@
 
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Recovered message counts: " + queueRecoveries);
+            _logger.info("Recovered message counts: " + _queueRecoveries);
         }
+
+        for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
+        {
+            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1005(entry.getValue(), String.valueOf(entry.getKey())));
+
+            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(String.valueOf(entry.getKey()), true));
+        }
+
+        // Free the memory
+        _queueRecoveries = null;
+
     }
 
     private Connection getConnection(final StoreContext context)

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java?rev=802113&r1=802112&r2=802113&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java Fri Aug  7 18:03:56 2009
@@ -24,6 +24,9 @@
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 
+import javax.jms.Connection;
+import javax.jms.Queue;
+import javax.jms.Session;
 import java.util.List;
 
 /**
@@ -71,7 +74,7 @@
      * This value will be logged on startup after the MessageStore has been
      * created.
      * Input:
-     * Default configuration                      
+     * Default configuration
      * Output:
      *
      * <date> MST-1002 : Store location : <path>
@@ -163,12 +166,18 @@
         //Validate each vhost logs a creation
         results = _monitor.findMatches("MST-1004");
 
-        assertEquals("Each vhost did not close its store.", vhosts.size(), results.size());
+        assertTrue("Each vhost did not close its store.", vhosts.size() <= results.size());
 
         for (int index = 0; index < results.size(); index++)
         {
             String result = getLog(results.get(index));
 
+            if (getMessageString(result).contains("Recovery Start :"))
+            {
+                //Don't test queue start recoveries
+                continue;
+            }
+
             // getSlize will return extract the vhost from vh(/test) -> '/test'
             // so remove the '/' to get the name
             String vhostName = AbstractTestLogSubject.getSlice("vh", result).substring(1);
@@ -224,12 +233,18 @@
         //Validate each vhost logs a creation
         results = _monitor.findMatches("MST-1006");
 
-        assertEquals("Each vhost did not close its store.", vhosts.size(), results.size());
+        assertTrue("Each vhost did not close its store.", vhosts.size() <= results.size());
 
         for (int index = 0; index < results.size(); index++)
         {
             String result = getLog(results.get(index));
 
+            if (getMessageString(result).contains("Recovery Complete :"))
+            {
+                //Don't test queue start recoveries
+                continue;
+            }
+
             // getSlize will return extract the vhost from vh(/test) -> '/test'
             // so remove the '/' to get the name
             String vhostName = AbstractTestLogSubject.getSlice("vh", result).substring(1);
@@ -251,4 +266,233 @@
         }
     }
 
+    /**
+     * Description:
+     * A persistent MessageStore may have data to recover from disk. The message store will use MST-1004 to report the start of recovery for a specific queue that it has previously persisted.
+     * Input:
+     * Default persistent configuration
+     * Output:
+     *
+     * <date> MST-1004 : Recovery Start : <queue.name>
+     *
+     * Validation Steps:
+     *
+     * 1. The MST ID is correct
+     * 2. This must occur after the recovery start MST-1004 has been logged.
+     */
+    public void testMessageStoreQueueRecoveryStart() throws Exception
+    {
+        assertLoggingNotYetOccured(MESSAGES_STORE_PREFIX);
+
+        startBroker();
+
+        List<String> results = _monitor.findMatches(MESSAGES_STORE_PREFIX);
+
+        // Validation
+
+        assertTrue("MST messages not logged", results.size() > 0);
+
+        // Load VirtualHost list from file.
+        Configuration configuration = ServerConfiguration.flatConfig(_configFile);
+        List<String> vhosts = configuration.getList("virtualhosts.virtualhost.name");
+
+        //Validate each vhost logs a creation
+        results = _monitor.findMatches("MST-1004 : Recovery Start :");
+
+        assertEquals("Recovered test queue not found.", 1, results.size());
+
+        String result = getLog(results.get(0));
+
+        // getSlize will return extract the vhost from vh(/test) -> '/test'
+        // so remove the '/' to get the name
+        String vhostName = AbstractTestLogSubject.getSlice("vh", result).substring(1);
+
+        // To get the store class used in the configuration we need to know
+        // the virtualhost name, found above. AND
+        // the index that the virtualhost is within the configuration.
+        // we can retrive that from the vhosts list previously extracted.
+        String fullStoreName = configuration.getString("virtualhosts.virtualhost(" + vhosts.indexOf(vhostName) + ")." + vhostName + ".store.class");
+
+        // Get the Simple class name from the expected class name of o.a.q.s.s.MMS
+        String storeName = fullStoreName.substring(fullStoreName.lastIndexOf(".") + 1);
+
+        assertTrue("MST-1006 does end with queue 'test-queue':" + getMessageString(result),
+                   getMessageString(result).endsWith("test-queue"));
+
+        assertEquals("The store name does not match expected value",
+                     storeName, AbstractTestLogSubject.getSlice("ms", fromSubject(result)));
+
+    }
+
+    /**
+     * Description:
+     * After the queue has been recovered the store will log that recovery has been completed. The MessageStore must not report further status about the recovery of this queue after this message. In addition every MST-1004 queue recovery start message must be matched with a MST-1006 recovery complete.
+     * Input:
+     * Default persistent configuration
+     * Output:
+     *
+     * <date> MST-1006 : Recovery Complete : <queue.name>
+     *
+     * Validation Steps:
+     *
+     * 1. The MST ID is correct
+     * 2. This must occur after the queue recovery start MST-1004 has been logged.
+     * 3. The queue.name is non-empty
+     * 4. The queue.name correlates with a previous recovery start
+     */
+    public void testMessageStoreQueueRecoveryComplete() throws Exception
+    {
+        assertLoggingNotYetOccured(MESSAGES_STORE_PREFIX);
+
+        startBroker();
+
+        List<String> results = _monitor.findMatches(MESSAGES_STORE_PREFIX);
+
+        // Validation
+
+        assertTrue("MST messages not logged", results.size() > 0);
+
+        // Load VirtualHost list from file.
+        Configuration configuration = ServerConfiguration.flatConfig(_configFile);
+        List<String> vhosts = configuration.getList("virtualhosts.virtualhost.name");
+
+        //Validate each vhost logs a creation
+        results = _monitor.findMatches("MST-1006 : Recovery Complete :");
+
+        assertEquals("Recovered test queue not found.", 1, results.size());
+
+        String result = getLog(results.get(0));
+
+        // getSlize will return extract the vhost from vh(/test) -> '/test'
+        // so remove the '/' to get the name
+        String vhostName = AbstractTestLogSubject.getSlice("vh", result).substring(1);
+
+        // To get the store class used in the configuration we need to know
+        // the virtualhost name, found above. AND
+        // the index that the virtualhost is within the configuration.
+        // we can retrive that from the vhosts list previously extracted.
+        String fullStoreName = configuration.getString("virtualhosts.virtualhost(" + vhosts.indexOf(vhostName) + ")." + vhostName + ".store.class");
+
+        // Get the Simple class name from the expected class name of o.a.q.s.s.MMS
+        String storeName = fullStoreName.substring(fullStoreName.lastIndexOf(".") + 1);
+
+        assertTrue("MST-1006 does end with queue 'test-queue':" + getMessageString(result),
+                   getMessageString(result).endsWith("test-queue"));
+
+        assertEquals("The store name does not match expected value",
+                     storeName, AbstractTestLogSubject.getSlice("ms", fromSubject(result)));
+
+        results = _monitor.findMatches("MST-1004 : Recovery Start : test-queue");
+
+        assertEquals("MST-1004 for test-queue not found", 1, results.size());
+    }
+
+    /**
+     * Description:
+     * On recovery all the persistent messages that are stored on disk must be returned to the queue. MST-1005 will report the number of messages that have been recovered from disk.
+     * Input:
+     *
+     * 1. Default persistent configuration
+     * 2. Persistent queue with multiple messages enqueued
+     * Output:
+     *
+     * <date> MST-1005 : Recovered <count> messages for queue <queue.name>
+     *
+     * Validation Steps:
+     * 3. The MST ID is correct
+     * 4. This must occur after the queue recovery start MST-1004 has been logged.
+     * 5. The count is > 1
+     * 6. 'messages' is correctly printed
+     * 7. The queue.name is non-empty
+     */
+    public void testMessageStoreQueueRecoveryCountPlural() throws Exception
+    {
+        assertLoggingNotYetOccured(MESSAGES_STORE_PREFIX);
+
+        String queueName = "queueCountTest";
+
+        startBroker();
+        Connection connetion = getConnection();
+        Session session = connetion.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='true'");
+
+        session.createConsumer(queue).close();
+
+        int COUNT = 10;
+
+        sendMessage(session, queue, COUNT);
+        try
+        {
+            connetion.close();
+
+            stopBroker();
+
+            // Clear our monitor
+            _monitor.reset();
+
+            startBroker();
+
+            List<String> results = _monitor.findMatches(MESSAGES_STORE_PREFIX);
+
+            // Validation
+
+            assertTrue("MST messages not logged", results.size() > 0);
+
+            // Load VirtualHost list from file.
+            Configuration configuration = ServerConfiguration.flatConfig(_configFile);
+            List<String> vhosts = configuration.getList("virtualhosts.virtualhost.name");
+
+            //Validate each vhost logs a creation
+            results = _monitor.findMatches("MST-1004 : Recovery Start : " + queueName);
+
+            assertEquals("Recovered test queue not found.", 1, results.size());
+
+            String result = getLog(results.get(0));
+
+            validateMessageID("MST-1004", result);
+
+            assertTrue("MST-1004 does end with queue '" + queueName + "':" + getMessageString(result),
+                       getMessageString(result).endsWith(queueName));
+
+            results = _monitor.findMatches("MST-1005");
+
+            assertEquals("Recovered test queue not found.", 2, results.size());
+
+            result = getLog(results.get(0));
+
+            // If the first message is not our queue the second one will be
+            if (!result.contains(queueName))
+            {
+                result = getLog(results.get(1));
+            }
+
+            // getSlize will return extract the vhost from vh(/test) -> '/test'
+            // so remove the '/' to get the name
+            String vhostName = AbstractTestLogSubject.getSlice("vh", result).substring(1);
+
+            // To get the store class used in the configuration we need to know
+            // the virtualhost name, found above. AND
+            // the index that the virtualhost is within the configuration.
+            // we can retrive that from the vhosts list previously extracted.
+            String fullStoreName = configuration.getString("virtualhosts.virtualhost(" + vhosts.indexOf(vhostName) + ")." + vhostName + ".store.class");
+
+            // Get the Simple class name from the expected class name of o.a.q.s.s.MMS
+            String storeName = fullStoreName.substring(fullStoreName.lastIndexOf(".") + 1);
+
+            assertTrue("MST-1005 does end with queue 'test-queue':" + getMessageString(result),
+                       getMessageString(result).endsWith(queueName));
+
+            assertTrue("MST-1005 does end show correct count:" + getMessageString(result),
+                       getMessageString(result).contains("Recovered " + COUNT + " messages"));
+
+            assertEquals("The store name does not match expected value",
+                         storeName, AbstractTestLogSubject.getSlice("ms", fromSubject(result)));
+        }
+        finally
+        {
+            //Ensure we attempt to drain the queue.
+            assertEquals("Unable to drain queue", COUNT, drainQueue(queue));
+        }
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org