You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/04/21 16:28:32 UTC
svn commit: r1588886 [3/6] - in /qpid/trunk/qpid/java:
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/src/main/java/org/apache/qpid/server/store/b...
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1588886&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Mon Apr 21 14:28:29 2014
@@ -0,0 +1,713 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase;
+import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class VirtualHostStoreUpgraderAndRecoverer
+{
+ private final ConfiguredObjectFactory _objectFactory;
+ private final VirtualHostNode<?> _virtualHostNode;
+ private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>();
+
+ @SuppressWarnings("serial")
+ private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }});
+
+ private final Map<String, UUID> _defaultExchangeIds;
+
+ public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode, ConfiguredObjectFactory objectFactory)
+ {
+ _virtualHostNode = virtualHostNode;
+ _objectFactory = objectFactory;
+ register(new UpgraderFactory_0_0());
+ register(new UpgraderFactory_0_1());
+ register(new UpgraderFactory_0_2());
+ register(new UpgraderFactory_0_3());
+ register(new UpgraderFactory_0_4());
+
+ Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
+ for (String exchangeName : DEFAULT_EXCHANGES.keySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostNode.getName());
+ defaultExchangeIds.put(exchangeName, id);
+ }
+ _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
+ }
+
+ private void register(UpgraderPhaseFactory factory)
+ {
+ _upgraders.put(factory.getFromVersion(), factory);
+ }
+
+ /*
+ * Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker
+ * such bindings would have been ignored, starting from the point at which the config version changed, these
+ * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
+ */
+ private class UpgraderFactory_0_0 extends UpgraderPhaseFactory
+ {
+ private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
+
+ public UpgraderFactory_0_0()
+ {
+ super("0.0", "0.1");
+ }
+
+
+ @Override
+ public StoreUpgraderPhase newInstance()
+ {
+ return new StoreUpgraderPhase("modelVersion", getToVersion())
+ {
+
+ @Override
+ public void configuredObject(final ConfiguredObjectRecord record)
+ {
+ _records.put(record.getId(), record);
+ }
+
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
+
+ private boolean isTopicExchange(ConfiguredObjectRecord entry)
+ {
+ ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+ if (exchangeRecord == null)
+ {
+ return false;
+ }
+ UUID exchangeId = exchangeRecord.getId();
+
+ if(_records.containsKey(exchangeId))
+ {
+ return "topic".equals(_records.get(exchangeId)
+ .getAttributes()
+ .get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ }
+
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
+ {
+ ConfiguredObjectRecord record = entry.getValue();
+ String type = record.getType();
+ Map<String, Object> attributes = record.getAttributes();
+ UUID id = record.getId();
+ if ("org.apache.qpid.server.model.VirtualHost".equals(type))
+ {
+ record = upgradeRootRecord(record);
+ }
+ else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
+ {
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ removeSelectorArguments(attributes);
+
+ record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
+ getUpdateMap().put(id, record);
+ entry.setValue(record);
+
+ }
+ getNextUpgrader().configuredObject(record);
+ }
+
+ getNextUpgrader().complete();
+ }
+ };
+ }
+
+ }
+
+ /*
+ * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
+ * configuration store). Also remove bindings which reference nonexistent queues or exchanges.
+ */
+ private class UpgraderFactory_0_1 extends UpgraderPhaseFactory
+ {
+ protected UpgraderFactory_0_1()
+ {
+ super("0.1", "0.2");
+ }
+
+ @Override
+ public StoreUpgraderPhase newInstance()
+ {
+ return new StoreUpgraderPhase("modelVersion", getToVersion())
+ {
+
+ @Override
+ public void configuredObject(final ConfiguredObjectRecord record)
+ {
+ String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
+ ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
+ getUpdateMap().put(record.getId(), newRecord);
+
+ if ("VirtualHost".equals(type))
+ {
+ newRecord = upgradeRootRecord(newRecord);
+ }
+ }
+
+ @Override
+ public void complete()
+ {
+ for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+ final ConfiguredObjectRecord record = entry.getValue();
+ final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
+ final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
+ if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
+ || queueParent == null || unknownQueue(queueParent.getId())))
+ {
+ getDeleteMap().put(entry.getKey(), entry.getValue());
+ iterator.remove();
+ }
+ else
+ {
+ getNextUpgrader().configuredObject(record);
+ }
+ }
+ getNextUpgrader().complete();
+ }
+
+ private boolean unknownExchange(final UUID exchangeId)
+ {
+ if (_defaultExchangeIds.containsValue(exchangeId))
+ {
+ return false;
+ }
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
+ return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
+ }
+
+ private boolean unknownQueue(final UUID queueId)
+ {
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+ return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
+ }
+
+ private boolean isBinding(final String type)
+ {
+ return Binding.class.getSimpleName().equals(type);
+ }
+ };
+ }
+ }
+
+
+ /*
+ * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
+ * attributes into the map using the model attribute names rather than the wire attribute names
+ */
+ private class UpgraderFactory_0_2 extends UpgraderPhaseFactory
+ {
+ protected UpgraderFactory_0_2()
+ {
+ super("0.2", "0.3");
+ }
+
+ @Override
+ public StoreUpgraderPhase newInstance()
+ {
+ return new StoreUpgraderPhase("modelVersion", getToVersion())
+ {
+ private static final String ARGUMENTS = "arguments";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if("VirtualHost".equals(record.getType()))
+ {
+ record = upgradeRootRecord(record);
+ }
+ else if("Queue".equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+ if(record.getAttributes().get(ARGUMENTS) instanceof Map)
+ {
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
+ .get(ARGUMENTS)));
+ }
+ newAttributes.putAll(record.getAttributes());
+
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+
+ getNextUpgrader().configuredObject(record);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ };
+ }
+ }
+
+ /*
+ * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
+ * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
+ * ensure OWNER is null unless the exclusivity policy is CONTAINER
+ */
+ private class UpgraderFactory_0_3 extends UpgraderPhaseFactory
+ {
+ protected UpgraderFactory_0_3()
+ {
+ super("0.3", "0.4");
+ }
+
+ @Override
+ public StoreUpgraderPhase newInstance()
+ {
+ return new StoreUpgraderPhase("modelVersion", getToVersion())
+ {
+ private static final String EXCLUSIVE = "exclusive";
+
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if("VirtualHost".equals(record.getType()))
+ {
+ record = upgradeRootRecord(record);
+ }
+ else if(Queue.class.getSimpleName().equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
+ if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
+ {
+ boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
+ newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+ if(!isExclusive && record.getAttributes().containsKey("owner"))
+ {
+ newAttributes.remove("owner");
+ }
+ }
+ else
+ {
+ newAttributes.remove("owner");
+ }
+ if(!record.getAttributes().containsKey("durable"))
+ {
+ newAttributes.put("durable","true");
+ }
+
+ record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+
+ getNextUpgrader().configuredObject(record);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ };
+ }
+ }
+
+ private class UpgraderFactory_0_4 extends UpgraderPhaseFactory
+ {
+ protected UpgraderFactory_0_4()
+ {
+ super("0.4", "2.0");
+ }
+
+ @Override
+ public StoreUpgraderPhase newInstance()
+ {
+ return new StoreUpgraderPhase("modelVersion", getToVersion())
+ {
+ private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+ private static final String EXCHANGE_NAME = "name";
+ private static final String EXCHANGE_TYPE = "type";
+ private static final String EXCHANGE_DURABLE = "durable";
+ private ConfiguredObjectRecord _virtualHostRecord;
+
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if("VirtualHost".equals(record.getType()))
+ {
+ record = upgradeRootRecord(record);
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
+ virtualHostAttributes.put("name", _virtualHostNode.getName());
+ virtualHostAttributes.put("modelVersion", getToVersion());
+ record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
+ _virtualHostRecord = record;
+ }
+ else if("Exchange".equals(record.getType()))
+ {
+ Map<String, Object> attributes = record.getAttributes();
+ String name = (String)attributes.get(EXCHANGE_NAME);
+ _missingAmqpExchanges.remove(name);
+ }
+ getNextUpgrader().configuredObject(record);
+ }
+
+ @Override
+ public void complete()
+ {
+ for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
+ {
+ String name = entry.getKey();
+ String type = entry.getValue();
+ UUID id = _defaultExchangeIds.get(name);
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(EXCHANGE_NAME, name);
+ attributes.put(EXCHANGE_TYPE, type);
+ attributes.put(EXCHANGE_DURABLE, true);
+
+ ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
+ getUpdateMap().put(id, record);
+
+ getNextUpgrader().configuredObject(record);
+
+ }
+
+ getNextUpgrader().complete();
+ }
+ };
+ }
+
+ }
+
+ public void perform(DurableConfigurationStore durableConfigurationStore)
+ {
+ UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders);
+ durableConfigurationStore.visitConfiguredObjectRecords(vhrh);
+ }
+
+ //TODO: generalize this class
+ private static class UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler
+ {
+ private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class);
+
+ private final Map<UUID, ConfiguredObjectRecord> _records = new LinkedHashMap<UUID, ConfiguredObjectRecord>();
+ private Map<String, UpgraderPhaseFactory> _upgraders;
+
+ private final VirtualHostNode<?> _parent;
+ private final ConfiguredObjectFactory _configuredObjectFactory;
+ private final DurableConfigurationStore _store;
+
+ public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders)
+ {
+ super();
+ _parent = parent;
+ _configuredObjectFactory = configuredObjectFactory;
+ _upgraders = upgraders;
+ _store = durableConfigurationStore;
+ }
+
+ @Override
+ public void begin()
+ {
+ }
+
+ @Override
+ public boolean handle(final ConfiguredObjectRecord record)
+ {
+ _records.put(record.getId(), record);
+ return true;
+ }
+
+ @Override
+ public void end()
+ {
+ String version = getCurrentVersion();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size());
+ }
+
+ DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version);
+
+ for(ConfiguredObjectRecord record : _records.values())
+ {
+ upgrader.configuredObject(record);
+ }
+
+ upgrader.complete();
+
+ Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords();
+ Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted");
+ LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated");
+ LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)");
+ }
+
+ _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
+ _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
+
+ _records.keySet().removeAll(deletedRecords.keySet());
+ _records.putAll(updatedRecords);
+
+ ConfiguredObjectRecord virtualHostRecord = null;
+ for (ConfiguredObjectRecord record : _records.values())
+ {
+ LOGGER.debug("Found type " + record.getType());
+ if ("VirtualHost".equals(record.getType()))
+ {
+ virtualHostRecord = record;
+ break;
+ }
+ }
+
+ if (virtualHostRecord != null)
+ {
+ String parentCategory = _parent.getCategoryClass().getSimpleName();
+ ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.<String, Object>emptyMap());
+ Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, parentRecord);
+ _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents));
+ Collection<ConfiguredObjectRecord> records = _records.values();
+ resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()]));
+ }
+ }
+
+ private DurableConfigurationStoreUpgrader buildUpgraderChain(String version)
+ {
+ DurableConfigurationStoreUpgrader head = null;
+ while(!BrokerModel.MODEL_VERSION.equals(version))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Adding virtual host store upgrader from model version: " + version);
+ }
+ final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version);
+ StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance();
+ if(head == null)
+ {
+ head = upgrader;
+ }
+ else
+ {
+ head.setNextUpgrader(upgrader);
+ }
+ version = upgraderPhaseFactory.getToVersion();
+ }
+
+ if(head == null)
+ {
+ head = new NullUpgrader();
+ }
+ else
+ {
+ head.setNextUpgrader(new NullUpgrader());
+ }
+
+ return head;
+ }
+
+ private String getCurrentVersion()
+ {
+ for(ConfiguredObjectRecord record : _records.values())
+ {
+ if(record.getType().equals("VirtualHost"))
+ {
+ return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION);
+ }
+ }
+ return BrokerModel.MODEL_VERSION;
+ }
+
+
+ public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject<?> root, ConfiguredObjectRecord... records)
+ {
+ Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
+ resolvedObjects.put(root.getId(), root);
+
+ Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
+ Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
+ new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
+
+ boolean updatesMade;
+
+ do
+ {
+ updatesMade = false;
+ Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
+ while (iter.hasNext())
+ {
+ ConfiguredObjectRecord record = iter.next();
+ Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
+ boolean foundParents = true;
+ for (ConfiguredObjectRecord parent : record.getParents().values())
+ {
+ if (!resolvedObjects.containsKey(parent.getId()))
+ {
+ foundParents = false;
+ break;
+ }
+ else
+ {
+ parents.add(resolvedObjects.get(parent.getId()));
+ }
+ }
+ if (foundParents)
+ {
+ iter.remove();
+ UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
+ factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
+ Collection<ConfiguredObjectDependency<?>> dependencies =
+ recovered.getUnresolvedDependencies();
+ if (dependencies.isEmpty())
+ {
+ updatesMade = true;
+ ConfiguredObject<?> resolved = recovered.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
+ }
+ else
+ {
+ recordsWithUnresolvedDependencies.add(recovered);
+ }
+ }
+
+ }
+
+ Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
+ recordsWithUnresolvedDependencies.iterator();
+
+ while(unresolvedIter.hasNext())
+ {
+ UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
+ Collection<ConfiguredObjectDependency<?>> dependencies =
+ new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+
+ for(ConfiguredObjectDependency dependency : dependencies)
+ {
+ if(dependency instanceof ConfiguredObjectIdDependency)
+ {
+ UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
+ if(resolvedObjects.containsKey(id))
+ {
+ dependency.resolve(resolvedObjects.get(id));
+ }
+ }
+ else if(dependency instanceof ConfiguredObjectNameDependency)
+ {
+ ConfiguredObject<?> dependentObject = null;
+ for(ConfiguredObject<?> parent : unresolvedObject.getParents())
+ {
+ dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
+ if(dependentObject != null)
+ {
+ break;
+ }
+ }
+ if(dependentObject != null)
+ {
+ dependency.resolve(dependentObject);
+ }
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
+ }
+ }
+ if(unresolvedObject.getUnresolvedDependencies().isEmpty())
+ {
+ updatesMade = true;
+ unresolvedIter.remove();
+ ConfiguredObject<?> resolved = unresolvedObject.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
+ }
+ }
+
+ } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
+
+ if(!recordsWithUnresolvedDependencies.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
+ }
+ if(!recordsWithUnresolvedParents.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
+ }
+ }
+
+ }
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1588886&r1=1588885&r2=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Apr 21 14:28:29 2014
@@ -41,7 +41,6 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -51,7 +50,6 @@ import org.apache.qpid.server.exchange.A
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -60,9 +58,27 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.IntegrityViolationException;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.adapter.VirtualHostAliasAdapter;
+import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -74,7 +90,6 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
@@ -131,14 +146,12 @@ public abstract class AbstractVirtualHos
private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
private final AtomicBoolean _deleted = new AtomicBoolean();
+ private final VirtualHostNode<?> _virtualHostNode;
@ManagedAttributeField
private Map<String, Object> _messageStoreSettings;
@ManagedAttributeField
- private Map<String, Object> _configurationStoreSettings;
-
- @ManagedAttributeField
private boolean _queue_deadLetterQueueEnabled;
@ManagedAttributeField
@@ -163,12 +176,13 @@ public abstract class AbstractVirtualHos
private String _securityAcl;
private MessageDestination _defaultDestination;
-
- public AbstractVirtualHost(final Map<String, Object> attributes, Broker<?> broker)
+ public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
- super(parentsMap(broker),
- enhanceWithId(attributes), broker.getTaskExecutor());
- _broker = broker;
+ super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(VirtualHostNode.class, virtualHostNode),
+ enhanceWithId(attributes), ((Broker<?>)virtualHostNode.getParent(Broker.class)).getTaskExecutor());
+ _broker = virtualHostNode.getParent(Broker.class);
+ _virtualHostNode = virtualHostNode;
+
_dtxRegistry = new DtxRegistry();
_eventLogger = _broker.getParent(SystemContext.class).getEventLogger();
@@ -209,6 +223,41 @@ public abstract class AbstractVirtualHos
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
+
+ DurableConfigurationStore durableConfigurationStore = _virtualHostNode.getConfigurationStore();
+
+ boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider();
+ if (nodeIsMessageStoreProvider)
+ {
+ if (!(durableConfigurationStore instanceof MessageStore))
+ {
+ throw new IllegalConfigurationException("Virtual host node " + _virtualHostNode.getName()
+ + " is configured as a provider of message store but the MessageStore interface is not implemented on a configuration store of type "
+ + durableConfigurationStore.getClass().getName());
+ }
+ }
+ else
+ {
+ Map<String, Object> messageStoreSettings = getMessageStoreSettings();
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalConfigurationException("Message store settings are missed for VirtualHost " + getName()
+ + ". You can either configure the message store setting on the host or "
+ + (durableConfigurationStore instanceof MessageStore ?
+ " configure VirtualHostNode " + _virtualHostNode.getName() + " as a provider of message store" :
+ " change the node type to one having configuration store implementing the MessageStore inteface") );
+ }
+ String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
+ if (storeType == null)
+ {
+ throw new IllegalConfigurationException("Message store type setting is not set");
+ }
+ MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get(storeType);
+ if (factory == null)
+ {
+ throw new IllegalConfigurationException("Message store factory is not found for type " + storeType + " for VirtualHost " + getName());
+ }
+ }
}
@Override
@@ -224,18 +273,17 @@ public abstract class AbstractVirtualHos
protected void onOpen()
{
super.onOpen();
- _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
registerSystemNodes();
initialiseStatistics();
- Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
{
@Override
public Object run()
{
- initialiseStorage(AbstractVirtualHost.this);
+ initialiseStorage();
return null;
}
});
@@ -280,7 +328,7 @@ public abstract class AbstractVirtualHos
}
}
- abstract protected void initialiseStorage(org.apache.qpid.server.model.VirtualHost<?,?,?> virtualHost);
+ abstract protected void initialiseStorage();
protected boolean isStoreEmpty()
{
@@ -547,7 +595,6 @@ public abstract class AbstractVirtualHos
return _houseKeepingTasks.getActiveCount();
}
-
public long getCreateTime()
{
return _createTime;
@@ -620,8 +667,6 @@ public abstract class AbstractVirtualHos
private AMQQueue<?> addQueueWithoutDLQ(Map<String, Object> attributes) throws QueueExistsException
{
-
-
try
{
return (AMQQueue) getObjectFactory().create(Queue.class, attributes, this);
@@ -779,8 +824,6 @@ public abstract class AbstractVirtualHos
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType
{
-
-
try
{
return (ExchangeImpl) getObjectFactory().create(Exchange.class, attributes, this);
@@ -815,6 +858,9 @@ public abstract class AbstractVirtualHos
_state = VirtualHostState.STOPPED;
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
+
+ // TODO: The state work will replace this with closure of the virtualhost, rather than deleting it.
+ deleted();
}
private void closeStorage()
@@ -830,28 +876,11 @@ public abstract class AbstractVirtualHos
_logger.error("Failed to close message store", e);
}
}
- if (getDurableConfigurationStore() != null)
+
+ if (!_virtualHostNode.isMessageStoreProvider())
{
- try
- {
- getDurableConfigurationStore().closeConfigurationStore();
- MessageStoreLogSubject configurationStoreSubject = getConfigurationStoreLogSubject();
- if (configurationStoreSubject != null)
- {
- getEventLogger().message(configurationStoreSubject, ConfigStoreMessages.CLOSE());
- }
- }
- catch (StoreException e)
- {
- _logger.error("Failed to close configuration store", e);
- }
+ getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
}
- getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
- }
-
- protected MessageStoreLogSubject getConfigurationStoreLogSubject()
- {
- return null;
}
public void registerMessageDelivered(long messageSize)
@@ -925,11 +954,6 @@ public abstract class AbstractVirtualHos
return _dtxRegistry;
}
- public String toString()
- {
- return getName();
- }
-
public VirtualHostState getVirtualHostState()
{
return _state;
@@ -1029,22 +1053,6 @@ public abstract class AbstractVirtualHos
_state = state;
}
- protected void attainActivation()
- {
- VirtualHostState finalState = VirtualHostState.ERRORED;
-
- try
- {
- initialiseHouseKeeping(getHousekeepingCheckPeriod());
- finalState = VirtualHostState.ACTIVE;
- }
- finally
- {
- _state = finalState;
- reportIfError(_state);
- }
- }
-
protected void reportIfError(VirtualHostState state)
{
if (state == VirtualHostState.ERRORED)
@@ -1053,22 +1061,6 @@ public abstract class AbstractVirtualHos
}
}
- protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
- {
- DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this),
- new ExchangeRecoverer(this),
- new BindingRecoverer(this)
- };
-
- final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
- for(DurableConfiguredObjectRecoverer recoverer : recoverers)
- {
- recovererMap.put(recoverer.getType(), recoverer);
- }
- return recovererMap;
- }
-
private static class IsStoreEmptyHandler implements ConfiguredObjectRecordHandler
{
private boolean _empty = true;
@@ -1349,20 +1341,12 @@ public abstract class AbstractVirtualHos
return _storeTransactionOpenTimeoutWarn;
}
- @SuppressWarnings("unchecked")
@Override
public Map<String, Object> getMessageStoreSettings()
{
return _messageStoreSettings;
}
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getConfigurationStoreSettings()
- {
- return _configurationStoreSettings;
- }
-
@Override
public long getQueueCount()
{
@@ -1424,6 +1408,7 @@ public abstract class AbstractVirtualHos
{
if (desiredState == State.ACTIVE)
{
+ activate();
return true;
}
else if (desiredState == State.STOPPED)
@@ -1473,7 +1458,6 @@ public abstract class AbstractVirtualHos
return Collections.unmodifiableCollection(_aliases);
}
-
private String createDLQ(final String queueName)
{
final String dlExchangeName = getDeadLetterExchangeName(queueName);
@@ -1609,5 +1593,63 @@ public abstract class AbstractVirtualHos
return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
}
+ @Override
+ public String getModelVersion()
+ {
+ return BrokerModel.MODEL_VERSION;
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _virtualHostNode.getConfigurationStore();
+ }
+
+ @Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ getDurableConfigurationStore().create(asObjectRecord());
+ }
+
+ protected void activate()
+ {
+ _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
+
+ boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider();
+
+ MessageStore messageStore = getMessageStore();
+ Map<String, Object> messageStoreSettings = getMessageStoreSettings();
+ if (messageStoreSettings == null)
+ {
+ messageStoreSettings = Collections.emptyMap();
+ }
+ messageStore.openMessageStore(this, messageStoreSettings);
+
+ if (!nodeIsMessageStoreProvider)
+ {
+ getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED());
+ getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation()));
+ }
+
+ if (isStoreEmpty())
+ {
+ createDefaultExchanges();
+ }
+
+ new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
+
+ VirtualHostState finalState = VirtualHostState.ERRORED;
+ try
+ {
+ initialiseHouseKeeping(getHousekeepingCheckPeriod());
+ finalState = VirtualHostState.ACTIVE;
+ }
+ finally
+ {
+ _state = finalState;
+ reportIfError(_state);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1588886&r1=1588885&r2=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Mon Apr 21 14:28:29 2014
@@ -21,163 +21,89 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.MessageStoreFactory;
-import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
@ManagedObject( category = false, type = "STANDARD")
public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost>
{
- public static final String TYPE = "STANDARD";
- private MessageStore _messageStore;
- private DurableConfigurationStore _durableConfigurationStore;
+ public static final String TYPE = "STANDARD";
+ MessageStore _messageStore;
private MessageStoreLogSubject _messageStoreLogSubject;
- private MessageStoreLogSubject _configurationStoreLogSubject;
-
- public StandardVirtualHost(final Map<String, Object> attributes, Broker<?> broker)
+ public StandardVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
- super(attributes, broker);
+ super(attributes, virtualHostNode);
}
@Override
public void validate()
{
super.validate();
- Map<String,Object> attributes = getActualAttributes();
- Map<String, Object> messageStoreSettings = getMessageStoreSettings();
- if (messageStoreSettings == null)
- {
- throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
- }
-
- Object storeType = messageStoreSettings.get(MessageStore.STORE_TYPE);
- // need store type and path
- Collection<String> knownTypes = MessageStoreFactory.FACTORY_LOADER.getSupportedTypes();
- if (storeType == null)
+ VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+ if (!virtualHostNode.isMessageStoreProvider())
{
- throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
- +"' is required in attribute " + org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + ". Known types are : " + knownTypes);
- }
- else if (!(storeType instanceof String))
- {
- throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
- +"' is required and must be of type String. "
- +"Known types are : " + knownTypes);
- }
-
- MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get((String)storeType);
- if(factory == null)
- {
- throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
- +"' has value '" + storeType + "' which is not one of the valid values: "
- + "Known types are : " + knownTypes);
- }
+ Map<String,Object> attributes = getActualAttributes();
+ Map<String, Object> messageStoreSettings = getMessageStoreSettings();
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
- factory.validateAttributes(attributes);
+ Object storeType = messageStoreSettings.get(MessageStore.STORE_TYPE);
+ // need store type and path
+ Collection<String> knownTypes = MessageStoreFactory.FACTORY_LOADER.getSupportedTypes();
+ if (storeType == null)
+ {
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' is required in attribute " + org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + ". Known types are : " + knownTypes);
+ }
+ else if (!(storeType instanceof String))
+ {
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' is required and must be of type String. "
+ +"Known types are : " + knownTypes);
+ }
- }
+ MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get((String)storeType);
+ if(factory == null)
+ {
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' has value '" + storeType + "' which is not one of the valid values: "
+ + "Known types are : " + knownTypes);
+ }
- private DurableConfigurationStore initialiseConfigurationStore(String storeType)
- {
- DurableConfigurationStore configurationStore;
-
- if(storeType != null)
- {
- configurationStore = new DurableConfigurationStoreCreator().createMessageStore(storeType);
- }
- else if(getMessageStore() instanceof DurableConfigurationStore)
- {
- configurationStore = (DurableConfigurationStore) getMessageStore();
- }
- else
- {
- throw new ClassCastException(getMessageStore().getClass().getSimpleName() +
- " is not an instance of DurableConfigurationStore");
+ factory.validateAttributes(attributes);
}
- return configurationStore;
}
@Override
- protected void initialiseStorage(VirtualHost virtualHost)
+ protected void initialiseStorage()
{
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
- String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
- _messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
- _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
- getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
-
- Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
- String configurationStoreType = configurationStoreSettings == null
- ? null
- : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
- _durableConfigurationStore = initialiseConfigurationStore(configurationStoreType);
- boolean combinedStores = _durableConfigurationStore == _messageStore;
- if (combinedStores)
+ VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+ if (virtualHostNode.isMessageStoreProvider())
{
- configurationStoreSettings = new HashMap<String, Object>(messageStoreSettings);
- configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
- }
-
- if (!combinedStores)
- {
- _configurationStoreLogSubject =
- new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
- getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
- }
-
- _durableConfigurationStore.openConfigurationStore(virtualHost, configurationStoreSettings);
-
- _messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
-
- getEventLogger().message(_messageStoreLogSubject,
- MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
-
- if (_configurationStoreLogSubject != null)
- {
- getEventLogger().message(_configurationStoreLogSubject,
- ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
- getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
- }
-
-
- if (isStoreEmpty())
- {
- createDefaultExchanges();
+ _messageStore = (MessageStore)virtualHostNode.getConfigurationStore();
}
else
{
- ConfiguredObjectRecordHandler upgraderRecoverer =
- new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
- _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
- }
-
- if (_configurationStoreLogSubject != null)
- {
- getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ Map<String, Object> messageStoreSettings = getMessageStoreSettings();
+ String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
+ _messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
}
- new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
-
- attainActivation();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
}
@Override
@@ -187,21 +113,9 @@ public class StandardVirtualHost extends
}
@Override
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _durableConfigurationStore;
- }
-
- @Override
protected MessageStoreLogSubject getMessageStoreLogSubject()
{
return _messageStoreLogSubject;
}
- @Override
- protected MessageStoreLogSubject getConfigurationStoreLogSubject()
- {
- return _configurationStoreLogSubject;
- }
-
}
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java?rev=1588886&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java Mon Apr 21 14:28:29 2014
@@ -0,0 +1,354 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.io.File;
+import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
+import org.apache.qpid.server.virtualhost.StandardVirtualHost;
+
+public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandardVirtualHostNode<X>> extends AbstractConfiguredObject<X>
+ implements VirtualHostNode<X>
+{
+ private static final Logger LOGGER = Logger.getLogger(AbstractStandardVirtualHostNode.class);
+
+ private final Broker<?> _broker;
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIALISING);
+ private final EventLogger _eventLogger;
+
+ @ManagedAttributeField
+ private boolean _messageStoreProvider;
+
+ private MessageStoreLogSubject _configurationStoreLogSubject;
+ private DurableConfigurationStore _durableConfigurationStore;
+
+ @SuppressWarnings("rawtypes")
+ public AbstractStandardVirtualHostNode(Broker<?> parent, Map<String, Object> attributes, TaskExecutor taskExecutor)
+ {
+ super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent),
+ attributes, taskExecutor);
+ _broker = parent;
+ SystemContext systemContext = _broker.getParent(SystemContext.class);
+ _eventLogger = systemContext.getEventLogger();
+ }
+
+ @Override
+ public void validate()
+ {
+ super.validate();
+ DurableConfigurationStoreFactory durableConfigurationStoreFactory = getDurableConfigurationStoreFactory();
+ Map<String, Object> storeSettings = new HashMap<String, Object>(getActualAttributes());
+ storeSettings.put(DurableConfigurationStore.STORE_TYPE, durableConfigurationStoreFactory.getType());
+ durableConfigurationStoreFactory.validateConfigurationStoreSettings(storeSettings);
+ }
+
+ @Override
+ public void onOpen()
+ {
+ super.onOpen();
+ DurableConfigurationStoreFactory durableConfigurationStoreFactory = getDurableConfigurationStoreFactory();
+ _durableConfigurationStore = durableConfigurationStoreFactory.createDurableConfigurationStore();
+ _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
+
+ }
+
+ protected abstract DurableConfigurationStoreFactory getDurableConfigurationStoreFactory();
+
+ protected Map<String, Object> getDefaultMessageStoreSettings()
+ {
+ // TODO perhaps look for the MS with the default annotation and associated default.
+ Map<String, Object> settings = new HashMap<String, Object>();
+ settings.put(MessageStore.STORE_TYPE, "DERBY");
+ settings.put(MessageStore.STORE_PATH, "${qpid.work_dir}" + File.separator + "derbystore" + File.separator + getName());
+ return settings;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes,
+ ConfiguredObject... otherParents)
+ {
+ if(childClass == VirtualHost.class)
+ {
+ return (C) getObjectFactory().create(VirtualHost.class, attributes, this);
+ }
+ return super.addChild(childClass, attributes, otherParents);
+ }
+
+
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public <T extends ConfiguredObject> T getParent(Class<T> clazz)
+ {
+ if (clazz == Broker.class)
+ {
+ return (T) _broker;
+ }
+ return super.getParent(clazz);
+ }
+
+
+ @Override
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ State state = _state.get();
+ if (desiredState == State.DELETED)
+ {
+ if (state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.ERRORED)
+ {
+ if( _state.compareAndSet(state, State.DELETED))
+ {
+ delete();
+ return true;
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot delete virtual host node in " + state + " state");
+ }
+ }
+ else if (desiredState == State.ACTIVE)
+ {
+ if ((state == State.INITIALISING || state == State.STOPPED) && _state.compareAndSet(state, State.ACTIVE))
+ {
+ try
+ {
+ activate();
+ }
+ catch(RuntimeException e)
+ {
+ _state.compareAndSet(State.ACTIVE, State.ERRORED);
+ if (_broker.isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot activate virtual host node in " + state + " state");
+ }
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ if (_state.compareAndSet(state, State.STOPPED))
+ {
+ stop();
+ return true;
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot stop virtual host node in " + state + " state");
+ }
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean isMessageStoreProvider()
+ {
+ return _messageStoreProvider;
+ }
+
+ @Override
+ public VirtualHost<?,?,?> getVirtualHost()
+ {
+ Collection<VirtualHost> children = getChildren(VirtualHost.class);
+ if (children.size() == 0)
+ {
+ return null;
+ }
+ else if (children.size() == 1)
+ {
+ return children.iterator().next();
+ }
+ else
+ {
+ throw new IllegalStateException(this + " has an unexpected number of virtualhost children, size " + children.size());
+ }
+ }
+
+ @Override
+ public DurableConfigurationStore getConfigurationStore()
+ {
+ return _durableConfigurationStore;
+ }
+
+ private void activate()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Activating virtualhost node " + this);
+ }
+
+ Map<String, Object> attributes = buildAttributesForStore();
+
+ _durableConfigurationStore.openConfigurationStore(this, attributes);
+
+ _eventLogger.message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
+
+ if (this instanceof FileBasedVirtualHostNode)
+ {
+ @SuppressWarnings("rawtypes")
+ FileBasedVirtualHostNode fileBasedVirtualHostNode = (FileBasedVirtualHostNode) this;
+ _eventLogger.message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(fileBasedVirtualHostNode.getStorePath()));
+ }
+
+ _eventLogger.message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
+
+ VirtualHostStoreUpgraderAndRecoverer upgrader = new VirtualHostStoreUpgraderAndRecoverer(this, getObjectFactory());
+ upgrader.perform(_durableConfigurationStore);
+
+ _eventLogger.message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+
+ VirtualHost<?,?,?> host = getVirtualHost();
+
+ if (host == null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating new virtualhost with name : " + getName());
+ }
+ Map<String, Object> hostAttributes = new HashMap<String, Object>();
+ hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
+ hostAttributes.put(VirtualHost.NAME, getName());
+ hostAttributes.put(VirtualHost.TYPE, StandardVirtualHost.TYPE);
+ if (!isMessageStoreProvider())
+ {
+ hostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, getDefaultMessageStoreSettings());
+ }
+ host = createChild(VirtualHost.class, hostAttributes);
+ }
+ else
+ {
+ final VirtualHost<?,?,?> recoveredHost = host;
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ recoveredHost.open();
+ return null;
+ }
+ });
+ }
+
+ host.setDesiredState(host.getState(), State.ACTIVE);
+ }
+
+ private Map<String, Object> buildAttributesForStore()
+ {
+ final Map<String, Object> attributes = new HashMap<String, Object>();
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ for (String attributeName : getAttributeNames())
+ {
+ Object value = getAttribute(attributeName);
+ attributes.put(attributeName, value);
+ }
+ return null;
+ }
+ });
+
+ return attributes;
+ }
+
+ private void delete()
+ {
+ VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
+ {
+ virtualHost.setDesiredState(virtualHost.getState(), State.DELETED);
+ }
+ //TODO: this needs to be called from parent
+ deleted();
+
+ // TODO Split onDelete into deleteMessageStore/deleteConfigStore
+ if (_durableConfigurationStore instanceof MessageStore)
+ {
+ ((MessageStore)_durableConfigurationStore).onDelete();
+ }
+
+ }
+
+ private void stop()
+ {
+ VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
+ {
+ virtualHost.setDesiredState(virtualHost.getState(), State.STOPPED);
+ }
+ _durableConfigurationStore.closeConfigurationStore();
+
+ _eventLogger.message(_configurationStoreLogSubject, ConfigStoreMessages.CLOSE());
+ }
+
+
+}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/FileBasedVirtualHostNode.java (from r1588885, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/FileBasedVirtualHostNode.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/FileBasedVirtualHostNode.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java&r1=1588885&r2=1588886&rev=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/FileBasedVirtualHostNode.java Mon Apr 21 14:28:29 2014
@@ -18,15 +18,16 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.virtualhostnode;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.VirtualHostNode;
-public interface DurableConfiguredObjectRecoverer
+public interface FileBasedVirtualHostNode<X extends FileBasedVirtualHostNode<X>> extends VirtualHostNode<X>
{
- public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
- final ConfiguredObjectRecord record);
+ public static final String STORE_PATH = "storePath";
+
+ @ManagedAttribute(automate = true, mandatory = true)
+ public String getStorePath();
- public String getType();
}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeFactory.java (from r1588885, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeFactory.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeFactory.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java&r1=1588885&r2=1588886&rev=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeFactory.java Mon Apr 21 14:28:29 2014
@@ -18,31 +18,26 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Map;
import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-public class BDBHAVirtualHostFactory extends AbstractConfiguredObjectTypeFactory<BDBHAVirtualHost>
+public class JsonVirtualHostNodeFactory extends AbstractConfiguredObjectTypeFactory<JsonVirtualHostNodeImpl>
{
-
- public BDBHAVirtualHostFactory()
+ public JsonVirtualHostNodeFactory()
{
- super(BDBHAVirtualHost.class);
+ super(JsonVirtualHostNodeImpl.class);
}
@Override
- public BDBHAVirtualHost createInstance(final Map<String, Object> attributes,
- final ConfiguredObject<?>... parents)
+ public JsonVirtualHostNodeImpl createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
{
- final Broker broker = getParent(Broker.class, parents);
- return new BDBHAVirtualHost(attributes, broker);
+ Broker<?> parent = getParent(Broker.class, parents);
+ return new JsonVirtualHostNodeImpl(parent, attributes, parent.getTaskExecutor());
}
-
}
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeImpl.java?rev=1588886&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeImpl.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/JsonVirtualHostNodeImpl.java Mon Apr 21 14:28:29 2014
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
+import org.apache.qpid.server.store.JsonFileConfigStoreFactory;
+
+@ManagedObject(category=false, type="JSON")
+public class JsonVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JsonVirtualHostNodeImpl> implements FileBasedVirtualHostNode<JsonVirtualHostNodeImpl>
+{
+ @ManagedAttributeField
+ private String _storePath;
+
+ public JsonVirtualHostNodeImpl(Broker<?> parent, Map<String, Object> attributes, TaskExecutor taskExecutor)
+ {
+ super(parent, attributes, taskExecutor);
+ }
+
+ @Override
+ protected DurableConfigurationStoreFactory getDurableConfigurationStoreFactory()
+ {
+ return new JsonFileConfigStoreFactory();
+ }
+
+ @Override
+ public String getStorePath()
+ {
+ return _storePath;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " [id=" + getId() + ", name=" + getName() + ", storePath=" + getStorePath() + "]";
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory?rev=1588886&r1=1588885&r2=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory Mon Apr 21 14:28:29 2014
@@ -45,6 +45,5 @@ org.apache.qpid.server.exchange.FanoutEx
org.apache.qpid.server.exchange.HeadersExchangeFactory
org.apache.qpid.server.exchange.TopicExchangeFactory
org.apache.qpid.server.binding.BindingFactory
-
-
+org.apache.qpid.server.virtualhostnode.JsonVirtualHostNodeFactory
Modified: qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json?rev=1588886&r1=1588885&r2=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json Mon Apr 21 14:28:29 2014
@@ -21,7 +21,7 @@
{
"name": "${broker.name}",
"storeVersion": 1,
- "modelVersion": "1.4",
+ "modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "passwordFile",
@@ -52,13 +52,10 @@
"authenticationProvider" : "passwordFile",
"protocols" : [ "JMX_RMI" ]
}],
- "virtualhosts" : [ {
+ "virtualhostnodes" : [ {
"name" : "default",
- "type" : "STANDARD",
- "messageStoreSettings" : {
- "storeType" : "DERBY",
- "storePath" : "${qpid.work_dir}/derbystore/default"
- }
+ "type" : "JSON",
+ "storePath" : "${qpid.work_dir}/json/default"
} ],
"plugins" : [ {
"type" : "MANAGEMENT-HTTP",
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java?rev=1588886&r1=1588885&r2=1588886&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java Mon Apr 21 14:28:29 2014
@@ -23,8 +23,6 @@ package org.apache.qpid.server.configura
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -49,11 +47,8 @@ import org.apache.qpid.server.model.Grou
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
public class BrokerRecovererTest extends TestCase
@@ -61,8 +56,6 @@ public class BrokerRecovererTest extends
private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class);
private UUID _brokerId = UUID.randomUUID();
- private Map<String, Collection<ConfiguredObjectRecord>> _brokerEntryChildren = new HashMap<String, Collection<ConfiguredObjectRecord>>();
- private ConfiguredObjectRecord _authenticationProviderEntry1;
private AuthenticationProvider _authenticationProvider1;
private UUID _authenticationProvider1Id = UUID.randomUUID();
private SystemContext _systemContext;
@@ -93,8 +86,6 @@ public class BrokerRecovererTest extends
_authenticationProvider1 = mock(AuthenticationProvider.class);
when(_authenticationProvider1.getName()).thenReturn("authenticationProvider1");
when(_authenticationProvider1.getId()).thenReturn(_authenticationProvider1Id);
- _authenticationProviderEntry1 = mock(ConfiguredObjectRecord.class);
- _brokerEntryChildren.put(AuthenticationProvider.class.getSimpleName(), Arrays.asList(_authenticationProviderEntry1));
}
@Override
@@ -140,37 +131,6 @@ public class BrokerRecovererTest extends
}
}
- public void testCreateBrokerWithVirtualHost()
- {
- final ConfiguredObjectRecord virtualHostEntry = mock(ConfiguredObjectRecord.class);
-
- String typeName = VirtualHost.class.getSimpleName();
- when(virtualHostEntry.getType()).thenReturn(typeName);
- _brokerEntryChildren.put(typeName, Arrays.asList(virtualHostEntry));
-
- UUID vhostId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createVhostRecord(vhostId));
- Broker<?> broker = _systemContext.getBroker();
-
- assertNotNull(broker);
- broker.open();
- assertEquals(_brokerId, broker.getId());
- assertEquals(1, broker.getVirtualHosts().size());
- assertEquals(vhostId, broker.getVirtualHosts().iterator().next().getId());
-
- }
-
- public ConfiguredObjectRecord createVhostRecord(UUID id)
- {
- final Map<String, Object> vhostAttributes = new HashMap<String, Object>();
- vhostAttributes.put(VirtualHost.NAME, "vhost");
- vhostAttributes.put(VirtualHost.TYPE, "STANDARD");
- vhostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE,
- TestMemoryMessageStore.TYPE));
- return new ConfiguredObjectRecordImpl(id, VirtualHost.class.getSimpleName(), vhostAttributes, Collections
- .singletonMap(Broker.class.getSimpleName(), _brokerEntry));
- }
-
public ConfiguredObjectRecord createAuthProviderRecord(UUID id, String name)
{
final Map<String, Object> authProviderAttrs = new HashMap<String, Object>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org