You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/03 21:59:01 UTC
svn commit: r1584365 [9/15] - in
/qpid/branches/java-broker-config-store-changes/qpid/java: ./
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/jav...
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ResolvedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ResolvedObject.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ResolvedObject.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ResolvedObject.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class ResolvedObject<C extends ConfiguredObject<C>> implements UnresolvedConfiguredObject<C>
+{
+
+ private final C _resolved;
+
+
+ private ResolvedObject(final C resolved)
+ {
+ _resolved = resolved;
+ }
+
+ @Override
+ public ConfiguredObject<?>[] getParents()
+ {
+ final Collection<Class<? extends ConfiguredObject>> parentTypes =
+ Model.getInstance().getParentTypes(_resolved.getCategoryClass());
+ ConfiguredObject<?>[] parents = new ConfiguredObject[parentTypes.size()];
+ int i = 0;
+ for(Class<? extends ConfiguredObject> parentType : parentTypes)
+ {
+ parents[i] = _resolved.getParent(parentType);
+ i++;
+ }
+ return parents;
+ }
+
+ @Override
+ public Collection<ConfiguredObjectDependency<?>> getUnresolvedDependencies()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public C resolve()
+ {
+ return _resolved;
+ }
+
+ public static <T extends ConfiguredObject<T>> ResolvedObject<T> newInstance(T object)
+ {
+ return new ResolvedObject<T>(object);
+ }
+}
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedConfiguredObject.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedConfiguredObject.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedConfiguredObject.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public interface UnresolvedConfiguredObject<X extends ConfiguredObject<X>>
+{
+ ConfiguredObject<?>[] getParents();
+
+ Collection<ConfiguredObjectDependency<?>> getUnresolvedDependencies();
+
+ X resolve();
+
+}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java Thu Apr 3 19:58:53 2014
@@ -321,21 +321,7 @@ public class MapValueConverter
else if (typeObject instanceof ParameterizedType)
{
ParameterizedType parameterizedType= (ParameterizedType)typeObject;
- Type type = parameterizedType.getRawType();
- if (type == Set.class)
- {
- Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
- if (actualTypeArguments.length != 1)
- {
- throw new IllegalArgumentException("Set type argument is not specified");
- }
- Class<?> classObject = (Class<?>)actualTypeArguments[0];
- value = toSet(rawValue, classObject, attributeName);
- }
- else
- {
- throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported");
- }
+ value = convertParameterizedType(rawValue, parameterizedType, attributeName);
}
else
{
@@ -352,6 +338,62 @@ public class MapValueConverter
return attributes;
}
+ private static Object convertParameterizedType(Object rawValue, ParameterizedType parameterizedType, String attributeName)
+ {
+ Type type = parameterizedType.getRawType();
+ Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+ Object convertedValue;
+ if (type == Set.class)
+ {
+ if (actualTypeArguments.length != 1)
+ {
+ throw new IllegalArgumentException("Unexpected number of Set type arguments " + actualTypeArguments.length);
+ }
+ Class<?> classObject = (Class<?>)actualTypeArguments[0];
+ convertedValue = toSet(rawValue, classObject, attributeName);
+ }
+ else if (type == Map.class)
+ {
+ if (actualTypeArguments.length != 2)
+ {
+ throw new IllegalArgumentException("Unexpected number of Map type arguments " + actualTypeArguments.length);
+ }
+ Class<?> keyClassObject = (Class<?>)actualTypeArguments[0];
+ Class<?> valueClassObject = (Class<?>)actualTypeArguments[1];
+ convertedValue = toMap(rawValue, keyClassObject, valueClassObject, attributeName);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported");
+ }
+ return convertedValue;
+ }
+
+ private static <K,V> Map<K, V> toMap(Object rawValue, Class<K> keyClassObject, Class<V> valueClassObject, String attributeName)
+ {
+ if (rawValue == null)
+ {
+ return null;
+ }
+ if (rawValue instanceof Map)
+ {
+ Map<K, V> convertedMap = new HashMap<K, V>();
+ Map<?, ?> rawMap = (Map<?,?>)rawValue;
+
+ for (Map.Entry<?, ?> entry : rawMap.entrySet())
+ {
+ K convertedKey = convert(entry.getKey(), keyClassObject, attributeName + " (map key)");
+ V convertedValue = convert(entry.getValue(), valueClassObject, attributeName + " (map value)");
+ convertedMap.put(convertedKey, convertedValue);
+ }
+ return convertedMap;
+ }
+ else
+ {
+ throw new IllegalArgumentException("rawValue is not of unexpected type Map, was : " + rawValue.getClass());
+ }
+ }
+
public static <T> Set<T> toSet(Object rawValue, Class<T> setItemClass, String attributeName)
{
if (rawValue == null)
@@ -361,7 +403,7 @@ public class MapValueConverter
HashSet<T> set = new HashSet<T>();
if (rawValue instanceof Iterable)
{
- Iterable<?> iterable = (Iterable<?>)rawValue;
+ Iterable<?> iterable = (Iterable<?>)rawValue;
for (Object object : iterable)
{
T converted = convert(object, setItemClass, attributeName);
@@ -409,6 +451,10 @@ public class MapValueConverter
{
value = toEnum(attributeName, rawValue, (Class<Enum>) classObject);
}
+ else if (classObject == Object.class)
+ {
+ value = rawValue;
+ }
else
{
throw new IllegalArgumentException("Cannot convert '" + rawValue + "' of type '" + rawValue.getClass()
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Apr 3 19:58:53 2014
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -32,25 +31,23 @@ import java.util.concurrent.ScheduledFut
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.configuration.ExchangeConfiguration;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
@@ -73,9 +70,9 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
@@ -97,8 +94,6 @@ public abstract class AbstractVirtualHos
private final SecurityManager _securityManager;
- private final VirtualHostConfiguration _vhostConfig;
-
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
@@ -127,27 +122,14 @@ public abstract class AbstractVirtualHos
private final EventLogger _eventLogger;
-
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost)
{
- if (hostConfig == null)
- {
- throw new IllegalArgumentException("HostConfig cannot be null");
- }
-
- if (hostConfig.getName() == null || hostConfig.getName().length() == 0)
- {
- throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost.");
- }
-
_virtualHostRegistry = virtualHostRegistry;
_brokerStatisticsGatherer = brokerStatisticsGatherer;
- _vhostConfig = hostConfig;
- _name = _vhostConfig.getName();
+ _name = virtualHost.getName();
_dtxRegistry = new DtxRegistry();
_model = virtualHost;
_eventLogger = virtualHostRegistry.getEventLogger();
@@ -161,7 +143,7 @@ public abstract class AbstractVirtualHos
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
- _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
+ _houseKeepingTasks = new ScheduledThreadPoolExecutor(virtualHost.getHouseKeepingThreadCount());
_queueRegistry = new DefaultQueueRegistry(this);
@@ -176,7 +158,7 @@ public abstract class AbstractVirtualHos
initialiseStatistics();
- initialiseStorage(hostConfig, virtualHost);
+ initialiseStorage(virtualHost);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
@@ -192,19 +174,15 @@ public abstract class AbstractVirtualHos
}
}
- abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
- org.apache.qpid.server.model.VirtualHost virtualHost);
+ abstract protected void initialiseStorage(org.apache.qpid.server.model.VirtualHost<?> virtualHost);
+
+ abstract protected MessageStoreLogSubject getMessageStoreLogSubject();
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
}
- public VirtualHostConfiguration getConfiguration()
- {
- return _vhostConfig;
- }
-
public UUID getId()
{
return _id;
@@ -306,135 +284,9 @@ public abstract class AbstractVirtualHos
}
- protected void initialiseModel(VirtualHostConfiguration config)
+ protected void initialiseModel()
{
- _logger.debug("Loading configuration for virtualhost: " + config.getName());
-
-
_exchangeRegistry.initialise(_exchangeFactory);
-
- List<String> exchangeNames = config.getExchanges();
-
- for (String exchangeName : exchangeNames)
- {
- try
- {
- configureExchange(config.getExchangeConfiguration(exchangeName));
- }
- catch (UnknownExchangeException e)
- {
- throw new ServerScopedRuntimeException("Could not configure exchange " + exchangeName, e);
- }
- catch (ReservedExchangeNameException e)
- {
- throw new ServerScopedRuntimeException("Could not configure exchange " + exchangeName, e);
- }
- catch (AMQUnknownExchangeType e)
- {
- throw new ServerScopedRuntimeException("Could not configure exchange " + exchangeName, e);
- }
- }
-
- String[] queueNames = config.getQueueNames();
-
- for (Object queueNameObj : queueNames)
- {
- String queueName = String.valueOf(queueNameObj);
- try
- {
- configureQueue(config.getQueueConfiguration(queueName));
- }
- catch (ConfigurationException e)
- {
- throw new ServerScopedRuntimeException("Could not configure queue " + queueName, e);
- }
- }
- }
-
- private void configureExchange(ExchangeConfiguration exchangeConfiguration)
- throws UnknownExchangeException, ReservedExchangeNameException,
- AMQUnknownExchangeType
- {
- boolean durable = exchangeConfiguration.getDurable();
- boolean autodelete = exchangeConfiguration.getAutoDelete();
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, exchangeConfiguration.getName());
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeConfiguration.getType());
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- autodelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- ExchangeImpl newExchange = createExchange(attributes);
- }
- catch(ExchangeExistsException e)
- {
- _logger.info("Exchange " + exchangeConfiguration.getName() + " already defined. Configuration in XML file ignored");
- }
-
- }
-
- private void configureQueue(QueueConfiguration queueConfiguration)
- throws ConfigurationException
- {
- AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration);
- String queueName = queue.getName();
-
- if (queue.isDurable())
- {
- DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
- }
-
- //get the exchange name (returns empty String if none was specified)
- String exchangeName = queueConfiguration.getExchange();
-
-
- if(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(exchangeName))
- {
- //get routing keys in configuration (returns empty list if none are defined)
- List<?> routingKeys = queueConfiguration.getRoutingKeys();
- if(!(routingKeys.isEmpty() || (routingKeys.size()==1 && routingKeys.contains(queueName))))
- {
- throw new ConfigurationException("Attempt to bind queue '" + queueName + "' with binding key(s) " +
- routingKeys + " without specifying an exchange");
- }
- }
- else
- {
- ExchangeImpl exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
- }
-
- //get routing keys in configuration (returns empty list if none are defined)
- List<?> routingKeys = queueConfiguration.getRoutingKeys();
-
- for (Object routingKeyNameObj : routingKeys)
- {
- String routingKey = String.valueOf(routingKeyNameObj);
-
- configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey));
- }
-
- if (!routingKeys.contains(queueName))
- {
- //bind the queue to the named exchange using its name
- configureBinding(queue, exchange, queueName, null);
- }
- }
- }
-
- private void configureBinding(AMQQueue queue, ExchangeImpl exchange, String routingKey, Map<String,Object> arguments)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName());
- }
- exchange.addBinding(routingKey, queue, arguments);
}
public String getName()
@@ -717,43 +569,43 @@ public abstract class AbstractVirtualHos
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
}
- protected void closeStorage()
+ private void closeStorage()
{
- //Close MessageStore
if (getMessageStore() != null)
{
- //Remove MessageStore Interface should not throw Exception
try
{
- getMessageStore().close();
+ getMessageStore().closeMessageStore();
}
- catch (Exception e)
+ catch (StoreException e)
{
_logger.error("Failed to close message store", e);
}
}
if (getDurableConfigurationStore() != null)
{
- //Remove MessageStore Interface should not throw Exception
try
{
- getDurableConfigurationStore().close();
+ getDurableConfigurationStore().closeConfigurationStore();
+ MessageStoreLogSubject configurationStoreSubject = getConfigurationStoreLogSubject();
+ if (configurationStoreSubject != null)
+ {
+ getEventLogger().message(configurationStoreSubject, ConfigStoreMessages.CLOSE());
+ }
}
- catch (Exception e)
+ catch (StoreException e)
{
- _logger.error("Failed to close message store", e);
+ _logger.error("Failed to close configuration store", e);
}
}
+ getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
}
-
- protected Logger getLogger()
+ protected MessageStoreLogSubject getConfigurationStoreLogSubject()
{
- return _logger;
+ return null;
}
-
-
public VirtualHostRegistry getVirtualHostRegistry()
{
return _virtualHostRegistry;
@@ -889,9 +741,11 @@ public abstract class AbstractVirtualHos
{
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
block();
+ _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
unblock();
+ _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
break;
}
}
@@ -907,7 +761,7 @@ public abstract class AbstractVirtualHos
try
{
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ initialiseHouseKeeping(_model.getHousekeepingCheckPeriod());
finalState = State.ACTIVE;
}
finally
@@ -981,10 +835,10 @@ public abstract class AbstractVirtualHos
}
try
{
- session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(),
- _vhostConfig.getTransactionTimeoutOpenClose(),
- _vhostConfig.getTransactionTimeoutIdleWarn(),
- _vhostConfig.getTransactionTimeoutIdleClose());
+ session.checkTransactionStatus(_model.getStoreTransactionOpenTimeoutWarn(),
+ _model.getStoreTransactionOpenTimeoutClose(),
+ _model.getStoreTransactionIdleTimeoutWarn(),
+ _model.getStoreTransactionIdleTimeoutClose());
} catch (Exception e)
{
_logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
@@ -1039,49 +893,55 @@ public abstract class AbstractVirtualHos
@Override
public long getDefaultAlertThresholdMessageAge()
{
- return getConfiguration().getMaximumMessageAge();
+ return _model.getQueue_alertThresholdMessageAge();
}
@Override
public long getDefaultAlertThresholdMessageSize()
{
- return getConfiguration().getMaximumMessageSize();
+ return _model.getQueue_alertThresholdMessageSize();
}
@Override
public long getDefaultAlertThresholdQueueDepthMessages()
{
- return getConfiguration().getMaximumMessageCount();
+ return _model.getQueue_alertThresholdQueueDepthMessages();
}
@Override
public long getDefaultAlertThresholdQueueDepthBytes()
{
- return getConfiguration().getMaximumQueueDepth();
+ return _model.getQueue_alertThresholdQueueDepthBytes();
}
@Override
public long getDefaultAlertRepeatGap()
{
- return getConfiguration().getMinimumAlertRepeatGap();
+ return _model.getQueue_alertRepeatGap();
}
@Override
public long getDefaultQueueFlowControlSizeBytes()
{
- return getConfiguration().getCapacity();
+ return _model.getQueue_flowControlSizeBytes();
}
@Override
public long getDefaultQueueFlowResumeSizeBytes()
{
- return getConfiguration().getFlowResumeCapacity();
+ return _model.getQueue_flowResumeSizeBytes();
}
@Override
public int getDefaultMaximumDeliveryAttempts()
{
- return getConfiguration().getMaxDeliveryCount();
+ return _model.getQueue_maximumDeliveryAttempts();
+ }
+
+ @Override
+ public boolean getDefaultDeadLetterQueueEnabled()
+ {
+ return _model.isQueue_deadLetterQueueEnabled();
}
@Override
@@ -1096,4 +956,5 @@ public abstract class AbstractVirtualHos
{
return _model;
}
+
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Thu Apr 3 19:58:53 2014
@@ -20,16 +20,22 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.filter.FilterSupport;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
@@ -39,19 +45,35 @@ import org.apache.qpid.server.store.NonN
import org.apache.qpid.server.store.NullUpgrader;
import org.apache.qpid.server.store.UpgraderProvider;
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
public class DefaultUpgraderProvider implements UpgraderProvider
{
+ private static final Logger LOGGER = Logger.getLogger(DefaultUpgraderProvider.class);
+
public static final String EXCLUSIVE = "exclusive";
- private final ExchangeRegistry _exchangeRegistry;
+ public static final String NAME = "name";
private final VirtualHost _virtualHost;
- public DefaultUpgraderProvider(final VirtualHost virtualHost,
- final ExchangeRegistry exchangeRegistry)
+ @SuppressWarnings("serial")
+ private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }});
+
+ private final Map<String, UUID> _defaultExchangeIds;
+
+ public DefaultUpgraderProvider(final VirtualHost virtualHost)
{
_virtualHost = virtualHost;
- _exchangeRegistry = exchangeRegistry;
+ Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
+ for (String exchangeName : DEFAULT_EXCHANGES.keySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, _virtualHost.getName());
+ defaultExchangeIds.put(exchangeName, id);
+ }
+ _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
@@ -67,6 +89,8 @@ public class DefaultUpgraderProvider imp
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
case 3:
currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
+ case 4:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader());
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -122,7 +146,12 @@ public class DefaultUpgraderProvider imp
private boolean isTopicExchange(ConfiguredObjectRecord entry)
{
- UUID exchangeId = entry.getParents().get("Exchange").getId();
+ ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+ if (exchangeRecord == null)
+ {
+ return false;
+ }
+ UUID exchangeId = exchangeRecord.getId();
if(_records.containsKey(exchangeId))
{
@@ -132,8 +161,13 @@ public class DefaultUpgraderProvider imp
}
else
{
- return _exchangeRegistry.getExchange(exchangeId) != null
- && _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
+ if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
+ {
+ return true;
+ }
+
+ return _virtualHost.getExchange(exchangeId) != null
+ && _virtualHost.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
}
}
@@ -214,9 +248,13 @@ public class DefaultUpgraderProvider imp
private boolean unknownExchange(final UUID exchangeId)
{
+ if (_defaultExchangeIds.containsValue(exchangeId))
+ {
+ return false;
+ }
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
- || _exchangeRegistry.getExchange(exchangeId) != null);
+ || _virtualHost.getExchange(exchangeId) != null);
}
private boolean unknownQueue(final UUID queueId)
@@ -318,4 +356,53 @@ public class DefaultUpgraderProvider imp
}
}
+ private class Version4Upgrader extends NonNullUpgrader
+ {
+ private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if(Exchange.class.getSimpleName().equals(record.getType()))
+ {
+ Map<String, Object> attributes = record.getAttributes();
+ String name = (String)attributes.get(NAME);
+ _missingAmqpExchanges.remove(name);
+ }
+
+ getNextUpgrader().configuredObject(record);
+ }
+
+ @Override
+ public void complete()
+ {
+ for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
+ {
+ String name = entry.getKey();
+ String type = entry.getValue();
+ UUID id = _defaultExchangeIds.get(name);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating amqp exchange " + name + " with id " + id);
+ }
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
+
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(_virtualHost.getId(), org.apache.qpid.server.model.VirtualHost.class.getSimpleName(), Collections.<String, Object>emptyMap());
+ ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(virtualHostRecord.getType(), virtualHostRecord));
+ getUpdateMap().put(id, record);
+
+ getNextUpgrader().configuredObject(record);
+
+ }
+
+ getNextUpgrader().complete();
+ }
+ }
+
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Thu Apr 3 19:58:53 2014
@@ -1,4 +1,4 @@
-package org.apache.qpid.server.virtualhost;/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,18 +18,22 @@ package org.apache.qpid.server.virtualho
* under the License.
*
*/
+package org.apache.qpid.server.virtualhost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.OperationalLoggingListener;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class StandardVirtualHost extends AbstractVirtualHost
{
@@ -37,69 +41,21 @@ public class StandardVirtualHost extends
private DurableConfigurationStore _durableConfigurationStore;
+ private MessageStoreLogSubject _messageStoreLogSubject;
+
+ private MessageStoreLogSubject _configurationStoreLogSubject;
+
StandardVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
+ VirtualHost virtualHost)
{
- super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, virtualHost);
}
-
-
- private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
- {
- final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.STORE_TYPE);
- String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
- MessageStore messageStore = null;
- if (storeType == null)
- {
- try
- {
- final Class<?> clazz = Class.forName(hostConfig.getMessageStoreClass());
- final Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException(clazz + " does not implement " + MessageStore.class);
- }
-
- messageStore = (MessageStore) o;
- }
- catch (ClassNotFoundException e)
- {
- throw new ServerScopedRuntimeException("Failed to fina virtual host message store implementation, " +
- "check the classpath and the configuration", e);
- }
- catch (InstantiationException e)
- {
- throw new ServerScopedRuntimeException("Failed to initialise virtual host store, " +
- "check the configuration", e);
- }
- catch (IllegalAccessException e)
- {
- throw new ServerScopedRuntimeException("Failed to initialise virtual host store, " +
- "check the configuration", e);
- }
- }
- else
- {
- messageStore = new MessageStoreCreator().createMessageStore(storeType);
- }
-
- final
- MessageStoreLogSubject
- storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
- OperationalLoggingListener.listen(messageStore, storeLogSubject, getEventLogger());
-
- return messageStore;
- }
-
- private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost)
+ private DurableConfigurationStore initialiseConfigurationStore(String storeType)
{
DurableConfigurationStore configurationStore;
- final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_TYPE);
- String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
if(storeType != null)
{
@@ -117,24 +73,52 @@ public class StandardVirtualHost extends
return configurationStore;
}
-
- protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
+ @Override
+ protected void initialiseStorage(VirtualHost virtualHost)
{
- _messageStore = initialiseMessageStore(hostConfig, virtualHost);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
+ _messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
+
+ Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
+ String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
+ _durableConfigurationStore = initialiseConfigurationStore(configurationStoreType);
+ boolean combinedStores = _durableConfigurationStore == _messageStore;
+ if (combinedStores)
+ {
+ configurationStoreSettings = new HashMap<String,Object>(messageStoreSettings);
+ configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
+ }
+
+ if (!combinedStores)
+ {
+ _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
+ }
- _durableConfigurationStore = initialiseConfigurationStore(virtualHost);
+ _durableConfigurationStore.openConfigurationStore(virtualHost, configurationStoreSettings);
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
- _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer);
+ _messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
+
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
+
+ if (_configurationStoreLogSubject != null)
+ {
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+ }
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler);
+ DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(this), getEventLogger());
- initialiseModel(hostConfig);
+ _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
- _messageStore.activate();
+ // If store does not have entries for standard exchanges (amq.*), the following will create them.
+ initialiseModel();
+
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
attainActivation();
}
@@ -151,4 +135,15 @@ public class StandardVirtualHost extends
return _durableConfigurationStore;
}
-}
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
+ {
+ return _messageStoreLogSubject;
+ }
+
+ @Override
+ protected MessageStoreLogSubject getConfigurationStoreLogSubject()
+ {
+ return _configurationStoreLogSubject;
+ }
+}
\ No newline at end of file
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java Thu Apr 3 19:58:53 2014
@@ -19,16 +19,13 @@ package org.apache.qpid.server.virtualho
*
*/
-import java.util.LinkedHashMap;
+import java.util.Collection;
import java.util.Map;
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.MessageStoreCreator;
+import org.apache.qpid.server.store.MessageStore;
public class StandardVirtualHostFactory implements VirtualHostFactory
{
@@ -45,74 +42,48 @@ public class StandardVirtualHostFactory
public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost)
{
- return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
+ return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, virtualHost);
}
- public static final String STORE_TYPE_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_TYPE;
- public static final String STORE_PATH_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_PATH;
-
@Override
public void validateAttributes(Map<String, Object> attributes)
{
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ Object storeType = messageStoreSettings.get(MessageStore.STORE_TYPE);
// need store type and path
- Object storeType = attributes.get(STORE_TYPE_ATTRIBUTE);
- if(!(storeType instanceof String))
- {
+ Collection<String> knownTypes = MessageStoreFactory.FACTORY_LOADER.getSupportedTypes();
- throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE
- +"' is required and must be of type String.");
- }
- final MessageStoreCreator storeCreator = new MessageStoreCreator();
- if(!storeCreator.isValidType((String)storeType))
+ if (storeType == null)
{
- throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE
- +"' has value '"+storeType+"' which is not one of the valid values: "
- + storeCreator.getStoreTypes() + ".");
-
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' is required in attribute " + org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + ". Known types are : " + knownTypes);
}
-
- for(MessageStoreFactory factory : storeCreator.getFactories())
+ else if (!(storeType instanceof String))
{
- if(factory.getType().equalsIgnoreCase((String)storeType))
- {
- factory.validateAttributes(attributes);
- }
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' is required and must be of type String. "
+ +"Known types are : " + knownTypes);
}
- }
-
- @Override
- public Map<String,Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter)
- {
- Map<String,Object> convertedMap = new LinkedHashMap<String, Object>();
- convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE));
- convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
-
- return convertedMap;
- }
-
- @Override
- public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration)
- {
- Map<String,Object> convertedMap = new LinkedHashMap<String, Object>();
- Configuration storeConfiguration = configuration.subset("store");
- convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_TYPE, storeConfiguration.getString("type"));
- convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
-
- convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY));
- convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY));
-
- for(MessageStoreFactory mf : new MessageStoreCreator().getFactories())
+ MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get((String)storeType);
+ if(factory == null)
{
- convertedMap.putAll(mf.convertStoreConfiguration(storeConfiguration));
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' has value '" + storeType + "' which is not one of the valid values: "
+ + "Known types are : " + knownTypes);
}
- return convertedMap;
+ factory.validateAttributes(attributes);
}
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Apr 3 19:58:53 2014
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledFut
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
@@ -49,8 +48,6 @@ public interface VirtualHost extends Dur
{
IConnectionRegistry getConnectionRegistry();
- VirtualHostConfiguration getConfiguration();
-
String getName();
AMQQueue getQueue(String name);
@@ -137,6 +134,8 @@ public interface VirtualHost extends Dur
int getDefaultMaximumDeliveryAttempts();
+ boolean getDefaultDeadLetterQueueEnabled();
+
TaskExecutor getTaskExecutor();
org.apache.qpid.server.model.VirtualHost getModel();
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Thu Apr 3 19:58:53 2014
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -62,18 +63,18 @@ public class VirtualHostConfigRecoveryHa
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
private final EventLogger _eventLogger;
- private MessageStoreLogSubject _logSubject;
+ private final MessageStoreLogSubject _logSubject;
private MessageStore _store;
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
{
_virtualHost = virtualHost;
_eventLogger = virtualHost.getEventLogger();
+ _logSubject = logSubject;
}
public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
- _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName());
_store = store;
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
return this;
@@ -81,6 +82,7 @@ public class VirtualHostConfigRecoveryHa
public StoredMessageRecoveryHandler begin()
{
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
return this;
}
@@ -232,10 +234,9 @@ public class VirtualHostConfigRecoveryHa
m.remove();
}
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
- }
- public void complete()
- {
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
}
public void queueEntry(final UUID queueId, long messageId)
@@ -314,8 +315,6 @@ public class VirtualHostConfigRecoveryHa
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
}
-
-
return this;
}
Copied: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory (from r1582544, qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory?p2=qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory&p1=qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory&r1=1582544&r2=1584365&rev=1584365&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory Thu Apr 3 19:58:53 2014
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
#
+org.apache.qpid.server.security.FileKeyStoreFactory
+org.apache.qpid.server.security.FileTrustStoreFactory
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.Base64MD5PasswordFileAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory
@@ -23,4 +25,16 @@ org.apache.qpid.server.security.auth.man
org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.SimpleLDAPAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.ScramSHA1AuthenticationManagerFactory
+org.apache.qpid.server.security.auth.manager.ScramSHA1UserRecoverer
+org.apache.qpid.server.model.port.AmqpPortFactory
+org.apache.qpid.server.model.port.HttpPortFactory
+org.apache.qpid.server.model.port.JmxPortFactory
+org.apache.qpid.server.model.port.RmiPortFactory
+org.apache.qpid.server.model.port.PortFactory
+org.apache.qpid.server.model.adapter.BrokerAdapterFactory
+org.apache.qpid.server.model.adapter.StandardVirtualHostAdapterFactory
+org.apache.qpid.server.model.adapter.FileBasedGroupProviderFactory
+org.apache.qpid.server.model.adapter.FileSystemPreferencesProviderFactory
+
+
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/initial-config.json
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/initial-config.json?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/initial-config.json (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/resources/initial-config.json Thu Apr 3 19:58:53 2014
@@ -21,7 +21,7 @@
{
"name": "Broker",
"storeVersion": 1,
- "modelVersion": "1.2",
+ "modelVersion": "1.4",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "passwordFile",
@@ -55,8 +55,10 @@
"virtualhosts" : [ {
"name" : "default",
"type" : "STANDARD",
- "storeType" : "DERBY",
- "storePath" : "${qpid.work_dir}/derbystore/default"
+ "messageStoreSettings" : {
+ "storeType" : "DERBY",
+ "storePath" : "${qpid.work_dir}/derbystore/default"
+ }
} ],
"plugins" : [ {
"pluginType" : "MANAGEMENT-HTTP",
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java Thu Apr 3 19:58:53 2014
@@ -20,28 +20,37 @@
*/
package org.apache.qpid.server.configuration;
-import java.io.File;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+import java.io.File;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class BrokerConfigurationStoreCreatorTest extends QpidTestCase
{
private File _userStoreLocation;
private BrokerConfigurationStoreCreator _storeCreator;
+ private SystemContext _systemContext;
public void setUp() throws Exception
{
@@ -56,6 +65,13 @@ public class BrokerConfigurationStoreCre
}
_storeCreator = new BrokerConfigurationStoreCreator();
_userStoreLocation = new File(TMP_FOLDER, "_store_" + System.currentTimeMillis() + "_" + getTestName());
+ final BrokerOptions brokerOptions = mock(BrokerOptions.class);
+ when(brokerOptions.getConfigurationStoreLocation()).thenReturn(_userStoreLocation.getAbsolutePath());
+ _systemContext = new SystemContext(new TaskExecutor(),
+ new ConfiguredObjectFactory(),
+ mock(EventLogger.class),
+ mock(LogRecorder.class),
+ brokerOptions);
}
public void tearDown() throws Exception
@@ -73,13 +89,15 @@ public class BrokerConfigurationStoreCre
}
}
+
public void testCreateJsonStore()
{
- ConfigurationEntryStore store = _storeCreator.createStore(_userStoreLocation.getAbsolutePath(), "json", BrokerOptions.DEFAULT_INITIAL_CONFIG_LOCATION, false, new BrokerOptions().getConfigProperties());
+ ConfigurationEntryStore store = _storeCreator.createStore(_systemContext, "json", BrokerOptions.DEFAULT_INITIAL_CONFIG_LOCATION, false, new BrokerOptions().getConfigProperties());
assertNotNull("Store was not created", store);
assertTrue("File should exists", _userStoreLocation.exists());
assertTrue("File size should be greater than 0", _userStoreLocation.length() > 0);
- JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_userStoreLocation.getAbsolutePath(), null, false, Collections.<String,String>emptyMap());
+ JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_systemContext, null, false, Collections
+ .<String,String>emptyMap());
Set<UUID> childrenIds = jsonStore.getRootEntry().getChildrenIds();
assertFalse("Unexpected children: " + childrenIds, childrenIds.isEmpty());
}
@@ -116,11 +134,11 @@ public class BrokerConfigurationStoreCre
File _initialStoreFile = TestFileUtils.createTempFile(this, ".json", brokerJson);
- ConfigurationEntryStore store = _storeCreator.createStore(_userStoreLocation.getAbsolutePath(), "json", _initialStoreFile.getAbsolutePath(), false, Collections.<String,String>emptyMap());
+ ConfigurationEntryStore store = _storeCreator.createStore(_systemContext, "json", _initialStoreFile.getAbsolutePath(), false, Collections.<String,String>emptyMap());
assertNotNull("Store was not created", store);
assertTrue("File should exists", _userStoreLocation.exists());
assertTrue("File size should be greater than 0", _userStoreLocation.length() > 0);
- JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_userStoreLocation.getAbsolutePath(), null, false, Collections.<String,String>emptyMap());
+ JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_systemContext, null, false, Collections.<String,String>emptyMap());
ConfigurationEntry entry = jsonStore.getRootEntry();
assertEquals("Unexpected root id", testBrokerId, entry.getId());
Map<String, Object> attributes = entry.getAttributes();
@@ -132,13 +150,13 @@ public class BrokerConfigurationStoreCre
if(overwrite)
{
- ConfigurationEntryStore overwrittenStore = _storeCreator.createStore(_userStoreLocation.getAbsolutePath(), "json", BrokerOptions.DEFAULT_INITIAL_CONFIG_LOCATION, true, new BrokerOptions().getConfigProperties());
+ ConfigurationEntryStore overwrittenStore = _storeCreator.createStore(_systemContext, "json", BrokerOptions.DEFAULT_INITIAL_CONFIG_LOCATION, true, new BrokerOptions().getConfigProperties());
assertNotNull("Store was not created", overwrittenStore);
assertTrue("File should exists", _userStoreLocation.exists());
assertTrue("File size should be greater than 0", _userStoreLocation.length() > 0);
//check the contents reflect the test store content having been overwritten with the default store
- JsonConfigurationEntryStore reopenedOverwrittenStore = new JsonConfigurationEntryStore(_userStoreLocation.getAbsolutePath(), null, false, Collections.<String,String>emptyMap());
+ JsonConfigurationEntryStore reopenedOverwrittenStore = new JsonConfigurationEntryStore(_systemContext, null, false, Collections.<String,String>emptyMap());
entry = reopenedOverwrittenStore.getRootEntry();
assertFalse("Root id did not change, store content was not overwritten", testBrokerId.equals(entry.getId()));
attributes = entry.getAttributes();
@@ -154,7 +172,7 @@ public class BrokerConfigurationStoreCre
{
try
{
- _storeCreator.createStore(_userStoreLocation.getAbsolutePath(), "other", null, false, Collections.<String,String>emptyMap());
+ _storeCreator.createStore(_systemContext, "other", null, false, Collections.<String,String>emptyMap());
fail("Store is not yet supported");
}
catch(IllegalConfigurationException e)
@@ -162,4 +180,5 @@ public class BrokerConfigurationStoreCre
// pass
}
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org