You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/04/22 14:32:23 UTC
svn commit: r1589112 [2/2] - in /qpid/trunk/qpid/java/broker-core/src:
main/java/org/apache/qpid/server/
main/java/org/apache/qpid/server/configuration/startup/
main/java/org/apache/qpid/server/model/
main/java/org/apache/qpid/server/registry/ main/jav...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1589112&r1=1589111&r2=1589112&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Tue Apr 22 12:32:23 2014
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -31,13 +28,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase;
-import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
@@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class VirtualHostStoreUpgraderAndRecoverer
{
- private final ConfiguredObjectFactory _objectFactory;
private final VirtualHostNode<?> _virtualHostNode;
- private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>();
+ private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();
@SuppressWarnings("serial")
private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
@@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAnd
public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode, ConfiguredObjectFactory objectFactory)
{
_virtualHostNode = virtualHostNode;
- _objectFactory = objectFactory;
- register(new UpgraderFactory_0_0());
- register(new UpgraderFactory_0_1());
- register(new UpgraderFactory_0_2());
- register(new UpgraderFactory_0_3());
- register(new UpgraderFactory_0_4());
+ register(new Upgrader_0_0_to_0_1());
+ register(new Upgrader_0_1_to_0_2());
+ register(new Upgrader_0_2_to_0_3());
+ register(new Upgrader_0_3_to_0_4());
+ register(new Upgrader_0_4_to_0_5());
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAnd
_defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
- private void register(UpgraderPhaseFactory factory)
+ private void register(StoreUpgraderPhase upgrader)
{
- _upgraders.put(factory.getFromVersion(), factory);
+ _upgraders.put(upgrader.getFromVersion(), upgrader);
}
/*
@@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAnd
* such bindings would have been ignored, starting from the point at which the config version changed, these
* arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
*/
- private class UpgraderFactory_0_0 extends UpgraderPhaseFactory
+ private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase
{
private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
- public UpgraderFactory_0_0()
+ public Upgrader_0_0_to_0_1()
{
- super("0.0", "0.1");
+ super("modelVersion", "0.0", "0.1");
}
-
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
- {
+ _records.put(record.getId(), record);
+ }
- @Override
- public void configuredObject(final ConfiguredObjectRecord record)
- {
- _records.put(record.getId(), record);
- }
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
- private void removeSelectorArguments(Map<String, Object> binding)
- {
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
- FilterSupport.removeFilters(arguments);
- binding.put(Binding.ARGUMENTS, arguments);
- }
+ private boolean isTopicExchange(ConfiguredObjectRecord entry)
+ {
+ ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+ if (exchangeRecord == null)
+ {
+ return false;
+ }
+ UUID exchangeId = exchangeRecord.getId();
- private boolean isTopicExchange(ConfiguredObjectRecord entry)
+ if(_records.containsKey(exchangeId))
+ {
+ return "topic".equals(_records.get(exchangeId)
+ .getAttributes()
+ .get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
{
- ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
- if (exchangeRecord == null)
- {
- return false;
- }
- UUID exchangeId = exchangeRecord.getId();
+ return true;
+ }
- if(_records.containsKey(exchangeId))
- {
- return "topic".equals(_records.get(exchangeId)
- .getAttributes()
- .get(org.apache.qpid.server.model.Exchange.TYPE));
- }
- else
- {
- if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
- {
- return true;
- }
+ return false;
+ }
- return false;
- }
+ }
- }
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ }
- private boolean hasSelectorArguments(Map<String, Object> binding)
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
+ {
+ ConfiguredObjectRecord record = entry.getValue();
+ String type = record.getType();
+ Map<String, Object> attributes = record.getAttributes();
+ UUID id = record.getId();
+ if ("org.apache.qpid.server.model.VirtualHost".equals(type))
{
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
- return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ record = upgradeRootRecord(record);
}
-
- @Override
- public void complete()
+ else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
{
- for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
- {
- ConfiguredObjectRecord record = entry.getValue();
- String type = record.getType();
- Map<String, Object> attributes = record.getAttributes();
- UUID id = record.getId();
- if ("org.apache.qpid.server.model.VirtualHost".equals(type))
- {
- record = upgradeRootRecord(record);
- }
- else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
- {
- attributes = new LinkedHashMap<String, Object>(attributes);
- removeSelectorArguments(attributes);
-
- record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
- getUpdateMap().put(id, record);
- entry.setValue(record);
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ removeSelectorArguments(attributes);
- }
- getNextUpgrader().configuredObject(record);
- }
+ record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
+ getUpdateMap().put(id, record);
+ entry.setValue(record);
- getNextUpgrader().complete();
}
- };
+ getNextUpgrader().configuredObject(record);
+ }
+
+ getNextUpgrader().complete();
}
}
@@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAnd
* Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
* configuration store). Also remove bindings which reference nonexistent queues or exchanges.
*/
- private class UpgraderFactory_0_1 extends UpgraderPhaseFactory
+ private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_1()
+ public Upgrader_0_1_to_0_2()
{
- super("0.1", "0.2");
+ super("modelVersion", "0.1", "0.2");
}
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
+ ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
+ getUpdateMap().put(record.getId(), newRecord);
+
+ if ("VirtualHost".equals(type))
{
+ newRecord = upgradeRootRecord(newRecord);
+ }
+ }
- @Override
- public void configuredObject(final ConfiguredObjectRecord record)
+ @Override
+ public void complete()
+ {
+ for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+ final ConfiguredObjectRecord record = entry.getValue();
+ final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
+ final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
+ if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
+ || queueParent == null || unknownQueue(queueParent.getId())))
{
- String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
- ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
- getUpdateMap().put(record.getId(), newRecord);
-
- if ("VirtualHost".equals(type))
- {
- newRecord = upgradeRootRecord(newRecord);
- }
+ getDeleteMap().put(entry.getKey(), entry.getValue());
+ iterator.remove();
}
-
- @Override
- public void complete()
+ else
{
- for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
- {
- Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
- final ConfiguredObjectRecord record = entry.getValue();
- final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
- final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
- if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
- || queueParent == null || unknownQueue(queueParent.getId())))
- {
- getDeleteMap().put(entry.getKey(), entry.getValue());
- iterator.remove();
- }
- else
- {
- getNextUpgrader().configuredObject(record);
- }
- }
- getNextUpgrader().complete();
+ getNextUpgrader().configuredObject(record);
}
+ }
+ getNextUpgrader().complete();
+ }
- private boolean unknownExchange(final UUID exchangeId)
- {
- if (_defaultExchangeIds.containsValue(exchangeId))
- {
- return false;
- }
- ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
- return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
- }
+ private boolean unknownExchange(final UUID exchangeId)
+ {
+ if (_defaultExchangeIds.containsValue(exchangeId))
+ {
+ return false;
+ }
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
+ return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
+ }
- private boolean unknownQueue(final UUID queueId)
- {
- ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
- return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
- }
+ private boolean unknownQueue(final UUID queueId)
+ {
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+ return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
+ }
- private boolean isBinding(final String type)
- {
- return Binding.class.getSimpleName().equals(type);
- }
- };
+ private boolean isBinding(final String type)
+ {
+ return Binding.class.getSimpleName().equals(type);
}
}
@@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAnd
* Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
* attributes into the map using the model attribute names rather than the wire attribute names
*/
- private class UpgraderFactory_0_2 extends UpgraderPhaseFactory
+ private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_2()
+ private static final String ARGUMENTS = "arguments";
+
+ public Upgrader_0_2_to_0_3()
{
- super("0.2", "0.3");
+ super("modelVersion", "0.2", "0.3");
}
+ @SuppressWarnings("unchecked")
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ if("VirtualHost".equals(record.getType()))
{
- private static final String ARGUMENTS = "arguments";
-
- @SuppressWarnings("unchecked")
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ record = upgradeRootRecord(record);
+ }
+ else if("Queue".equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+ if(record.getAttributes().get(ARGUMENTS) instanceof Map)
{
- if("VirtualHost".equals(record.getType()))
- {
- record = upgradeRootRecord(record);
- }
- else if("Queue".equals(record.getType()))
- {
- Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
- if(record.getAttributes().get(ARGUMENTS) instanceof Map)
- {
- newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
- .get(ARGUMENTS)));
- }
- newAttributes.putAll(record.getAttributes());
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
+ .get(ARGUMENTS)));
+ }
+ newAttributes.putAll(record.getAttributes());
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
- }
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
- getNextUpgrader().configuredObject(record);
- }
+ getNextUpgrader().configuredObject(record);
+ }
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
}
+
}
/*
@@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAnd
* where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
* ensure OWNER is null unless the exclusivity policy is CONTAINER
*/
- private class UpgraderFactory_0_3 extends UpgraderPhaseFactory
+ private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_3()
+ private static final String EXCLUSIVE = "exclusive";
+
+ public Upgrader_0_3_to_0_4()
{
- super("0.3", "0.4");
+ super("modelVersion", "0.3", "0.4");
}
+
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ if("VirtualHost".equals(record.getType()))
{
- private static final String EXCLUSIVE = "exclusive";
-
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ record = upgradeRootRecord(record);
+ }
+ else if(Queue.class.getSimpleName().equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
+ if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
{
- if("VirtualHost".equals(record.getType()))
+ boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
+ newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+ if(!isExclusive && record.getAttributes().containsKey("owner"))
{
- record = upgradeRootRecord(record);
+ newAttributes.remove("owner");
}
- else if(Queue.class.getSimpleName().equals(record.getType()))
- {
- Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
- if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
- {
- boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
- newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
- if(!isExclusive && record.getAttributes().containsKey("owner"))
- {
- newAttributes.remove("owner");
- }
- }
- else
- {
- newAttributes.remove("owner");
- }
- if(!record.getAttributes().containsKey("durable"))
- {
- newAttributes.put("durable","true");
- }
-
- record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
- }
-
- getNextUpgrader().configuredObject(record);
}
-
- @Override
- public void complete()
+ else
{
- getNextUpgrader().complete();
+ newAttributes.remove("owner");
}
- };
- }
- }
-
- private class UpgraderFactory_0_4 extends UpgraderPhaseFactory
- {
- protected UpgraderFactory_0_4()
- {
- super("0.4", "2.0");
- }
-
- @Override
- public StoreUpgraderPhase newInstance()
- {
- return new StoreUpgraderPhase("modelVersion", getToVersion())
- {
- private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
- private static final String EXCHANGE_NAME = "name";
- private static final String EXCHANGE_TYPE = "type";
- private static final String EXCHANGE_DURABLE = "durable";
- private ConfiguredObjectRecord _virtualHostRecord;
-
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ if(!record.getAttributes().containsKey("durable"))
{
- if("VirtualHost".equals(record.getType()))
- {
- record = upgradeRootRecord(record);
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
- virtualHostAttributes.put("name", _virtualHostNode.getName());
- virtualHostAttributes.put("modelVersion", getToVersion());
- record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
- _virtualHostRecord = record;
- }
- else if("Exchange".equals(record.getType()))
- {
- Map<String, Object> attributes = record.getAttributes();
- String name = (String)attributes.get(EXCHANGE_NAME);
- _missingAmqpExchanges.remove(name);
- }
- getNextUpgrader().configuredObject(record);
+ newAttributes.put("durable","true");
}
- @Override
- public void complete()
- {
- for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
- {
- String name = entry.getKey();
- String type = entry.getValue();
- UUID id = _defaultExchangeIds.get(name);
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(EXCHANGE_NAME, name);
- attributes.put(EXCHANGE_TYPE, type);
- attributes.put(EXCHANGE_DURABLE, true);
-
- ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
- getUpdateMap().put(id, record);
-
- getNextUpgrader().configuredObject(record);
-
- }
+ record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
- getNextUpgrader().complete();
- }
- };
+ getNextUpgrader().configuredObject(record);
}
- }
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
- public void perform(DurableConfigurationStore durableConfigurationStore)
- {
- UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders);
- durableConfigurationStore.visitConfiguredObjectRecords(vhrh);
}
- //TODO: generalize this class
- private static class UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler
+ private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase
{
- private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class);
-
- private final Map<UUID, ConfiguredObjectRecord> _records = new LinkedHashMap<UUID, ConfiguredObjectRecord>();
- private Map<String, UpgraderPhaseFactory> _upgraders;
-
- private final VirtualHostNode<?> _parent;
- private final ConfiguredObjectFactory _configuredObjectFactory;
- private final DurableConfigurationStore _store;
-
- public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders)
- {
- super();
- _parent = parent;
- _configuredObjectFactory = configuredObjectFactory;
- _upgraders = upgraders;
- _store = durableConfigurationStore;
- }
+ private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+ private static final String EXCHANGE_NAME = "name";
+ private static final String EXCHANGE_TYPE = "type";
+ private static final String EXCHANGE_DURABLE = "durable";
+ private ConfiguredObjectRecord _virtualHostRecord;
- @Override
- public void begin()
+ public Upgrader_0_4_to_0_5()
{
+ super("modelVersion", "0.4", "2.0");
}
@Override
- public boolean handle(final ConfiguredObjectRecord record)
- {
- _records.put(record.getId(), record);
- return true;
- }
-
- @Override
- public void end()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- String version = getCurrentVersion();
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size());
- }
-
- DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version);
-
- for(ConfiguredObjectRecord record : _records.values())
- {
- upgrader.configuredObject(record);
- }
-
- upgrader.complete();
-
- Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords();
- Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords();
-
- if (LOGGER.isDebugEnabled())
+ if("VirtualHost".equals(record.getType()))
{
- LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted");
- LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated");
- LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)");
+ record = upgradeRootRecord(record);
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
+ virtualHostAttributes.put("name", _virtualHostNode.getName());
+ virtualHostAttributes.put("modelVersion", getToVersion());
+ record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
+ _virtualHostRecord = record;
}
-
- _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
- _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
-
- _records.keySet().removeAll(deletedRecords.keySet());
- _records.putAll(updatedRecords);
-
- ConfiguredObjectRecord virtualHostRecord = null;
- for (ConfiguredObjectRecord record : _records.values())
+ else if("Exchange".equals(record.getType()))
{
- LOGGER.debug("Found type " + record.getType());
- if ("VirtualHost".equals(record.getType()))
- {
- virtualHostRecord = record;
- break;
- }
- }
-
- if (virtualHostRecord != null)
- {
- String parentCategory = _parent.getCategoryClass().getSimpleName();
- ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.<String, Object>emptyMap());
- Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, parentRecord);
- _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents));
- Collection<ConfiguredObjectRecord> records = _records.values();
- resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()]));
+ Map<String, Object> attributes = record.getAttributes();
+ String name = (String)attributes.get(EXCHANGE_NAME);
+ _missingAmqpExchanges.remove(name);
}
+ getNextUpgrader().configuredObject(record);
}
- private DurableConfigurationStoreUpgrader buildUpgraderChain(String version)
- {
- DurableConfigurationStoreUpgrader head = null;
- while(!BrokerModel.MODEL_VERSION.equals(version))
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Adding virtual host store upgrader from model version: " + version);
- }
- final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version);
- StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance();
- if(head == null)
- {
- head = upgrader;
- }
- else
- {
- head.setNextUpgrader(upgrader);
- }
- version = upgraderPhaseFactory.getToVersion();
- }
-
- if(head == null)
- {
- head = new NullUpgrader();
- }
- else
- {
- head.setNextUpgrader(new NullUpgrader());
- }
-
- return head;
- }
-
- private String getCurrentVersion()
+ @Override
+ public void complete()
{
- for(ConfiguredObjectRecord record : _records.values())
+ for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
{
- if(record.getType().equals("VirtualHost"))
- {
- return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION);
- }
- }
- return BrokerModel.MODEL_VERSION;
- }
+ String name = entry.getKey();
+ String type = entry.getValue();
+ UUID id = _defaultExchangeIds.get(name);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(EXCHANGE_NAME, name);
+ attributes.put(EXCHANGE_TYPE, type);
+ attributes.put(EXCHANGE_DURABLE, true);
- public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject<?> root, ConfiguredObjectRecord... records)
- {
- Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
- resolvedObjects.put(root.getId(), root);
-
- Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
- Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
- new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
+ ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
+ getUpdateMap().put(id, record);
- boolean updatesMade;
+ getNextUpgrader().configuredObject(record);
- do
- {
- updatesMade = false;
- Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
- while (iter.hasNext())
- {
- ConfiguredObjectRecord record = iter.next();
- Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
- boolean foundParents = true;
- for (ConfiguredObjectRecord parent : record.getParents().values())
- {
- if (!resolvedObjects.containsKey(parent.getId()))
- {
- foundParents = false;
- break;
- }
- else
- {
- parents.add(resolvedObjects.get(parent.getId()));
- }
- }
- if (foundParents)
- {
- iter.remove();
- UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
- factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
- Collection<ConfiguredObjectDependency<?>> dependencies =
- recovered.getUnresolvedDependencies();
- if (dependencies.isEmpty())
- {
- updatesMade = true;
- ConfiguredObject<?> resolved = recovered.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- else
- {
- recordsWithUnresolvedDependencies.add(recovered);
- }
- }
-
- }
-
- Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
- recordsWithUnresolvedDependencies.iterator();
-
- while(unresolvedIter.hasNext())
- {
- UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
- Collection<ConfiguredObjectDependency<?>> dependencies =
- new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+ }
- for(ConfiguredObjectDependency dependency : dependencies)
- {
- if(dependency instanceof ConfiguredObjectIdDependency)
- {
- UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
- if(resolvedObjects.containsKey(id))
- {
- dependency.resolve(resolvedObjects.get(id));
- }
- }
- else if(dependency instanceof ConfiguredObjectNameDependency)
- {
- ConfiguredObject<?> dependentObject = null;
- for(ConfiguredObject<?> parent : unresolvedObject.getParents())
- {
- dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
- if(dependentObject != null)
- {
- break;
- }
- }
- if(dependentObject != null)
- {
- dependency.resolve(dependentObject);
- }
- }
- else
- {
- throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
- }
- }
- if(unresolvedObject.getUnresolvedDependencies().isEmpty())
- {
- updatesMade = true;
- unresolvedIter.remove();
- ConfiguredObject<?> resolved = unresolvedObject.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- }
+ getNextUpgrader().complete();
+ }
- } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
+ }
- if(!recordsWithUnresolvedDependencies.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
- }
- if(!recordsWithUnresolvedParents.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
- }
- }
+ public void perform(DurableConfigurationStore durableConfigurationStore)
+ {
+ String virtualHostCategory = VirtualHost.class.getSimpleName();
+ GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
+ upgraderHandler.upgrade();
+ new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
}
}
Copied: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (from r1588966, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java?p2=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java&p1=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java&r1=1588966&r2=1589112&rev=1589112&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java Tue Apr 22 12:32:23 2014
@@ -18,11 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -31,9 +32,7 @@ import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
@@ -49,6 +48,7 @@ import org.apache.qpid.server.model.Syst
import org.apache.qpid.server.model.SystemContextImpl;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
+import org.apache.qpid.server.store.GenericRecoverer;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
public class BrokerRecovererTest extends TestCase
@@ -56,9 +56,9 @@ public class BrokerRecovererTest extends
private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class);
private UUID _brokerId = UUID.randomUUID();
- private AuthenticationProvider _authenticationProvider1;
+ private AuthenticationProvider<?> _authenticationProvider1;
private UUID _authenticationProvider1Id = UUID.randomUUID();
- private SystemContext _systemContext;
+ private SystemContext<?> _systemContext;
private ConfiguredObjectFactory _configuredObjectFactory;
private TaskExecutor _taskExecutor;
@@ -91,8 +91,14 @@ public class BrokerRecovererTest extends
@Override
protected void tearDown() throws Exception
{
- super.tearDown();
- _taskExecutor.stop();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ _taskExecutor.stop();
+ }
}
public void testCreateBrokerAttributes()
@@ -115,8 +121,8 @@ public class BrokerRecovererTest extends
when(_brokerEntry.getAttributes()).thenReturn(entryAttributes);
- _systemContext.resolveObjects(_brokerEntry);
- Broker broker = _systemContext.getBroker();
+ resolveObjects(_brokerEntry);
+ Broker<?> broker = _systemContext.getBroker();
assertNotNull(broker);
@@ -171,7 +177,7 @@ public class BrokerRecovererTest extends
UUID authProviderId = UUID.randomUUID();
UUID portId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
+ resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
portId,
5672,
"authProvider"));
@@ -188,7 +194,7 @@ public class BrokerRecovererTest extends
{
UUID authProviderId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
+ resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
Broker<?> broker = _systemContext.getBroker();
@@ -206,7 +212,7 @@ public class BrokerRecovererTest extends
UUID authProvider2Id = UUID.randomUUID();
UUID port2Id = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry,
+ resolveObjects(_brokerEntry,
createAuthProviderRecord(authProviderId, "authProvider"),
createPortRecord(portId, 5672, "authProvider"),
createAuthProviderRecord(authProvider2Id, "authProvider2"),
@@ -228,7 +234,7 @@ public class BrokerRecovererTest extends
UUID authProviderId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
+ resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
Broker<?> broker = _systemContext.getBroker();
@@ -253,7 +259,7 @@ public class BrokerRecovererTest extends
try
{
- _systemContext.resolveObjects(_brokerEntry);
+ resolveObjects(_brokerEntry);
Broker<?> broker = _systemContext.getBroker();
broker.open();
fail("The broker creation should fail due to unsupported model version");
@@ -323,33 +329,10 @@ public class BrokerRecovererTest extends
return String.valueOf(attributeValue);
}
- private RecovererProvider createRecoveryProvider(final ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer)
+ private void resolveObjects(ConfiguredObjectRecord... records)
{
- RecovererProvider recovererProvider = new RecovererProvider()
- {
- @Override
- public ConfiguredObjectRecoverer<? extends ConfiguredObject> getRecoverer(String type)
- {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final ConfiguredObjectRecoverer<? extends ConfiguredObject> recoverer = new ConfiguredObjectRecoverer()
- {
- public ConfiguredObject create(RecovererProvider recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents)
- {
- for (int i = 0; i < entries.length; i++)
- {
- ConfiguredObjectRecord e = entries[i];
- if (entry == e)
- {
- return objectsToRecoverer[i];
- }
- }
- return null;
- }
- };
-
- return recoverer;
- }
- };
- return recovererProvider;
+ GenericRecoverer recoverer = new GenericRecoverer(_systemContext, Broker.class.getSimpleName());
+ recoverer.recover(Arrays.asList(records));
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org