You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/06 13:04:59 UTC

svn commit: r1358118 - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ broker/src/main/java/org/apache/qpid/server/store/ broker/src/main...

Author: kwall
Date: Fri Jul  6 11:04:59 2012
New Revision: 1358118

URL: http://svn.apache.org/viewvc?rev=1358118&view=rev
Log:
QPID-4112: Virtualhosts recover exchanges before queues

Switch the recover order from queues,exchanges,... to exchanges,queues,.. so that when a queue with an alternate exchange
is recovered, the exchange's uuid is already in the registry.

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Jul  6 11:04:59 2012
@@ -60,6 +60,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.*;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -413,16 +414,16 @@ public abstract class AbstractBDBMessage
         try
         {
             List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-            QueueRecoveryHandler qrh = recoveryHandler.begin(this);
-            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
-            ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+            ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
             _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
 
-            BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+            QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+            BindingRecoveryHandler brh = qrh.completeQueueRecovery();
             _configuredObjectHelper.recoverBindings(brh, configuredObjects);
 
-            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
             recoverBrokerLinks(lrh);
         }
         catch (DatabaseException e)

Modified: qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Fri Jul  6 11:04:59 2012
@@ -261,13 +261,18 @@ public class QueueManagementTest extends
      */
     public void testAlternateExchangeSurvivesRestart() throws Exception
     {
+        String nonMandatoryExchangeName = "exch" + getName();
+
+        final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+        managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true);
+
         String queueName1 = getTestQueueName() + "1";
         String altExchange1 = "amq.fanout";
         String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1);
         Queue queue1 = _session.createQueue(addr1WithAltExch);
 
         String queueName2 = getTestQueueName() + "2";
-        String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,}}", queueName2);
+        String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2);
         Queue queue2 = _session.createQueue(addr2WithoutAltExch);
 
         createQueueOnBroker(queue1);
@@ -279,7 +284,7 @@ public class QueueManagementTest extends
         ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2);
         assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange());
 
-        String altExchange2 = "amq.fanout";
+        String altExchange2 = nonMandatoryExchangeName;
         managedQueue2.setAlternateExchange(altExchange2);
 
         restartBroker();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Jul  6 11:04:59 2012
@@ -28,20 +28,21 @@ import java.util.UUID;
 
 public interface ConfigurationRecoveryHandler
 {
-    QueueRecoveryHandler begin(MessageStore store);
+    ExchangeRecoveryHandler begin(MessageStore store);
 
-    public static interface QueueRecoveryHandler
+    public static interface ExchangeRecoveryHandler
     {
-        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
-        ExchangeRecoveryHandler completeQueueRecovery();
+        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
+        QueueRecoveryHandler completeExchangeRecovery();
     }
 
-    public static interface ExchangeRecoveryHandler
+    public static interface QueueRecoveryHandler
     {
-        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
-        BindingRecoveryHandler completeExchangeRecovery();
+        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
+        BindingRecoveryHandler completeQueueRecovery();
     }
 
+
     public static interface BindingRecoveryHandler
     {
         void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Fri Jul  6 11:04:59 2012
@@ -60,6 +60,7 @@ import org.apache.qpid.server.federation
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectHelper;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.Event;
@@ -78,6 +79,9 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
 
 /**
  * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
@@ -558,16 +562,17 @@ public class DerbyMessageStore implement
         try
         {
             List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-            ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
-            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
 
-            ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+            ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
             _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
 
-            ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+            QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+            BindingRecoveryHandler brh = qrh.completeQueueRecovery();
             _configuredObjectHelper.recoverBindings(brh, configuredObjects);
 
-            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
             recoverBrokerLinks(lrh);
         }
         catch (SQLException e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Jul  6 11:04:59 2012
@@ -76,15 +76,12 @@ public class VirtualHostConfigRecoveryHa
 
     private final VirtualHost _virtualHost;
 
-    private MessageStoreLogSubject _logSubject;
-
-    private MessageStore _store;
-
     private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
-    private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
-    private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
+    private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+    private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
 
+    private MessageStoreLogSubject _logSubject;
+    private MessageStore _store;
 
     public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
     {
@@ -131,12 +128,12 @@ public class VirtualHostConfigRecoveryHa
         }
         catch (AMQException e)
         {
-            // TODO
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
         }
     }
 
-    public ExchangeRecoveryHandler completeQueueRecovery()
+    @Override
+    public BindingRecoveryHandler completeQueueRecovery()
     {
         return this;
     }
@@ -156,19 +153,17 @@ public class VirtualHostConfigRecoveryHa
         }
         catch (AMQException e)
         {
-            // TODO
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
         }
     }
 
-    public BindingRecoveryHandler completeExchangeRecovery()
+    public QueueRecoveryHandler completeExchangeRecovery()
     {
         return this;
     }
 
     public StoredMessageRecoveryHandler begin()
     {
-        // TODO - log begin
         return this;
     }
 
@@ -193,7 +188,6 @@ public class VirtualHostConfigRecoveryHa
 
     public void completeMessageRecovery()
     {
-        //TODO - log end
     }
 
     public BridgeRecoveryHandler brokerLink(final UUID id,

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java Fri Jul  6 11:04:59 2012
@@ -113,9 +113,9 @@ public class DurableConfigurationStoreTe
         _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
 
         when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
-        when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler);
-        when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler);
-        when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler);
+        when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
+        when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler);
+        when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler);
         when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler);
         when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
         when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org