You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/06 13:04:59 UTC
svn commit: r1358118 - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/
broker/src/main/java/org/apache/qpid/server/store/ broker/src/main...
Author: kwall
Date: Fri Jul 6 11:04:59 2012
New Revision: 1358118
URL: http://svn.apache.org/viewvc?rev=1358118&view=rev
Log:
QPID-4112: Virtualhosts recover exchanges before queues
Switch the recover order from queues,exchanges,... to exchanges,queues,.. so that when a queue with an alternate exchange
is recovered, the exchange's uuid is already in the registry.
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Jul 6 11:04:59 2012
@@ -60,6 +60,7 @@ import org.apache.qpid.server.message.En
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -413,16 +414,16 @@ public abstract class AbstractBDBMessage
try
{
List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
_configuredObjectHelper.recoverExchanges(erh, configuredObjects);
- BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (DatabaseException e)
Modified: qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Fri Jul 6 11:04:59 2012
@@ -261,13 +261,18 @@ public class QueueManagementTest extends
*/
public void testAlternateExchangeSurvivesRestart() throws Exception
{
+ String nonMandatoryExchangeName = "exch" + getName();
+
+ final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+ managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true);
+
String queueName1 = getTestQueueName() + "1";
String altExchange1 = "amq.fanout";
String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1);
Queue queue1 = _session.createQueue(addr1WithAltExch);
String queueName2 = getTestQueueName() + "2";
- String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,}}", queueName2);
+ String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2);
Queue queue2 = _session.createQueue(addr2WithoutAltExch);
createQueueOnBroker(queue1);
@@ -279,7 +284,7 @@ public class QueueManagementTest extends
ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2);
assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange());
- String altExchange2 = "amq.fanout";
+ String altExchange2 = nonMandatoryExchangeName;
managedQueue2.setAlternateExchange(altExchange2);
restartBroker();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Jul 6 11:04:59 2012
@@ -28,20 +28,21 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- QueueRecoveryHandler begin(MessageStore store);
+ ExchangeRecoveryHandler begin(MessageStore store);
- public static interface QueueRecoveryHandler
+ public static interface ExchangeRecoveryHandler
{
- void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
- ExchangeRecoveryHandler completeQueueRecovery();
+ void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
+ QueueRecoveryHandler completeExchangeRecovery();
}
- public static interface ExchangeRecoveryHandler
+ public static interface QueueRecoveryHandler
{
- void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
- BindingRecoveryHandler completeExchangeRecovery();
+ void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
+ BindingRecoveryHandler completeQueueRecovery();
}
+
public static interface BindingRecoveryHandler
{
void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Fri Jul 6 11:04:59 2012
@@ -60,6 +60,7 @@ import org.apache.qpid.server.federation
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -78,6 +79,9 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
@@ -558,16 +562,17 @@ public class DerbyMessageStore implement
try
{
List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
_configuredObjectHelper.recoverExchanges(erh, configuredObjects);
- ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Jul 6 11:04:59 2012
@@ -76,15 +76,12 @@ public class VirtualHostConfigRecoveryHa
private final VirtualHost _virtualHost;
- private MessageStoreLogSubject _logSubject;
-
- private MessageStore _store;
-
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
- private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
+ private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+ private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+ private MessageStoreLogSubject _logSubject;
+ private MessageStore _store;
public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
{
@@ -131,12 +128,12 @@ public class VirtualHostConfigRecoveryHa
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
}
}
- public ExchangeRecoveryHandler completeQueueRecovery()
+ @Override
+ public BindingRecoveryHandler completeQueueRecovery()
{
return this;
}
@@ -156,19 +153,17 @@ public class VirtualHostConfigRecoveryHa
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
}
}
- public BindingRecoveryHandler completeExchangeRecovery()
+ public QueueRecoveryHandler completeExchangeRecovery()
{
return this;
}
public StoredMessageRecoveryHandler begin()
{
- // TODO - log begin
return this;
}
@@ -193,7 +188,6 @@ public class VirtualHostConfigRecoveryHa
public void completeMessageRecovery()
{
- //TODO - log end
}
public BridgeRecoveryHandler brokerLink(final UUID id,
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java?rev=1358118&r1=1358117&r2=1358118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java Fri Jul 6 11:04:59 2012
@@ -113,9 +113,9 @@ public class DurableConfigurationStoreTe
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler);
- when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler);
- when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler);
+ when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
+ when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler);
+ when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler);
when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org