You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/05 17:30:54 UTC
svn commit: r1500047 [2/2] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/a...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Jul 5 15:30:53 2013
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -42,10 +43,12 @@ import org.apache.qpid.server.exchange.D
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -259,7 +263,7 @@ public abstract class AbstractVirtualHos
{
_logger.debug("Loading configuration for virtualhost: " + config.getName());
- _exchangeRegistry.initialise();
+ _exchangeRegistry.initialise(_exchangeFactory);
List<String> exchangeNames = config.getExchanges();
@@ -279,25 +283,18 @@ public abstract class AbstractVirtualHos
private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
{
- AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
-
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
+ boolean durable = exchangeConfiguration.getDurable();
+ boolean autodelete = exchangeConfiguration.getAutoDelete();
+ try
{
-
- AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
- boolean durable = exchangeConfiguration.getDurable();
- boolean autodelete = exchangeConfiguration.getAutoDelete();
-
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
- _exchangeRegistry.registerExchange(newExchange);
-
- if (newExchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange);
- }
+ Exchange newExchange = createExchange(null, exchangeConfiguration.getName(), exchangeConfiguration.getType(), durable, autodelete,
+ null);
}
+ catch(ExchangeExistsException e)
+ {
+ _logger.info("Exchange " + exchangeConfiguration.getName() + " already defined. Configuration in XML file ignored");
+ }
+
}
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
@@ -374,16 +371,162 @@ public abstract class AbstractVirtualHos
return _queueRegistry;
}
- public ExchangeRegistry getExchangeRegistry()
+ protected ExchangeRegistry getExchangeRegistry()
{
return _exchangeRegistry;
}
- public ExchangeFactory getExchangeFactory()
+ protected ExchangeFactory getExchangeFactory()
{
return _exchangeFactory;
}
+ @Override
+ public void addVirtualHostListener(final VirtualHostListener listener)
+ {
+ _exchangeRegistry.addRegistryChangeListener(new ExchangeRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void exchangeRegistered(Exchange exchange)
+ {
+ listener.exchangeRegistered(exchange);
+ }
+
+ @Override
+ public void exchangeUnregistered(Exchange exchange)
+ {
+ listener.exchangeUnregistered(exchange);
+ }
+ });
+ _queueRegistry.addRegistryChangeListener(new QueueRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void queueRegistered(AMQQueue queue)
+ {
+ listener.queueRegistered(queue);
+ }
+
+ @Override
+ public void queueUnregistered(AMQQueue queue)
+ {
+ listener.queueUnregistered(queue);
+ }
+ });
+ _connectionRegistry.addRegistryChangeListener(new IConnectionRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void connectionRegistered(AMQConnectionModel connection)
+ {
+ listener.connectionRegistered(connection);
+ }
+
+ @Override
+ public void connectionUnregistered(AMQConnectionModel connection)
+ {
+ listener.connectionUnregistered(connection);
+ }
+ });
+ }
+
+ @Override
+ public Exchange getExchange(String name)
+ {
+ return _exchangeRegistry.getExchange(name);
+ }
+
+ @Override
+ public Exchange getDefaultExchange()
+ {
+ return _exchangeRegistry.getDefaultExchange();
+ }
+
+ @Override
+ public Collection<Exchange> getExchanges()
+ {
+ return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges());
+ }
+
+ @Override
+ public Collection<ExchangeType<? extends Exchange>> getExchangeTypes()
+ {
+ return _exchangeFactory.getRegisteredTypes();
+ }
+
+ @Override
+ public Exchange createExchange(UUID id,
+ String name,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchangeName)
+ throws AMQException
+ {
+
+ if(_exchangeRegistry.isReservedExchangeName(name))
+ {
+ throw new ReservedExchangeNameException(name);
+ }
+ synchronized (_exchangeRegistry)
+ {
+ Exchange existing;
+ if((existing = _exchangeRegistry.getExchange(name)) !=null)
+ {
+ throw new ExchangeExistsException(name,existing);
+ }
+ Exchange alternateExchange;
+
+ if(alternateExchangeName != null)
+ {
+ alternateExchange = _exchangeRegistry.getExchange(alternateExchangeName);
+ if(alternateExchange == null)
+ {
+ throw new UnknownExchangeException(alternateExchangeName);
+ }
+ }
+ else
+ {
+ alternateExchange = null;
+ }
+
+ if(id == null)
+ {
+ id = UUIDGenerator.generateExchangeUUID(name, getName());
+ }
+
+ Exchange exchange = _exchangeFactory.createExchange(id, name, type, durable, autoDelete);
+ exchange.setAlternateExchange(alternateExchange);
+ _exchangeRegistry.registerExchange(exchange);
+ if(durable)
+ {
+ DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), exchange);
+ }
+ return exchange;
+ }
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange, boolean force) throws AMQException
+ {
+ if(exchange.hasReferrers())
+ {
+ throw new ExchangeIsAlternateException(exchange.getName());
+ }
+
+ for(ExchangeType type : getExchangeTypes())
+ {
+ if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
+ {
+ throw new RequiredExchangeException(exchange.getName());
+ }
+ }
+ _exchangeRegistry.unregisterExchange(exchange.getName(), !force);
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), exchange);
+ }
+
+ }
+
public SecurityManager getSecurityManager()
{
return _securityManager;
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,39 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+
+public class ExchangeExistsException extends AMQException
+{
+ private final Exchange _existing;
+
+ public ExchangeExistsException(String name, Exchange existing)
+ {
+ super(name);
+ _existing = existing;
+ }
+
+ public Exchange getExistingExchange()
+ {
+ return _existing;
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class ExchangeIsAlternateException extends AMQException
+{
+ public ExchangeIsAlternateException(String name)
+ {
+ super(name);
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class RequiredExchangeException extends AMQException
+{
+ public RequiredExchangeException(String name)
+ {
+ super(name);
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,38 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class ReservedExchangeNameException extends AMQException
+{
+ private final String _name;
+
+ public ReservedExchangeNameException(String name)
+ {
+ super("Attempt to create an exchange using a reserved name or prefix: " + name);
+ _name = name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Fri Jul 5 15:30:53 2013
@@ -96,7 +96,7 @@ public class StandardVirtualHost extends
_durableConfigurationStore = initialiseConfigurationStore(virtualHost);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
_durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,38 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class UnknownExchangeException extends AMQException
+{
+ private final String _exchangeName;
+
+ public UnknownExchangeException(String exchangeName)
+ {
+ super(exchangeName);
+ _exchangeName = exchangeName;
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Jul 5 15:30:53 2013
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -45,9 +50,23 @@ public interface VirtualHost extends Dur
QueueRegistry getQueueRegistry();
- ExchangeRegistry getExchangeRegistry();
+ Exchange createExchange(UUID id,
+ String exchange,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchange)
+ throws AMQException;
- ExchangeFactory getExchangeFactory();
+ void removeExchange(Exchange exchange, boolean force) throws AMQException;
+
+ Exchange getExchange(String name);
+
+ Exchange getDefaultExchange();
+
+ Collection<Exchange> getExchanges();
+
+ Collection<ExchangeType<? extends Exchange>> getExchangeTypes();
DurableConfigurationStore getDurableConfigurationStore();
@@ -55,6 +74,8 @@ public interface VirtualHost extends Dur
SecurityManager getSecurityManager();
+ void addVirtualHostListener(VirtualHostListener listener);
+
void close();
UUID getId();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Jul 5 15:30:53 2013
@@ -31,9 +31,10 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -82,12 +83,19 @@ public class VirtualHostConfigRecoveryHa
private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
+ private final ExchangeRegistry _exchangeRegistry;
+ private final ExchangeFactory _exchangeFactory;
+
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
+ ExchangeRegistry exchangeRegistry,
+ ExchangeFactory exchangeFactory)
{
_virtualHost = virtualHost;
+ _exchangeRegistry = exchangeRegistry;
+ _exchangeFactory = exchangeFactory;
}
@Override
@@ -120,7 +128,7 @@ public class VirtualHostConfigRecoveryHa
if (alternateExchangeId != null)
{
- Exchange altExchange = _virtualHost.getExchangeRegistry().getExchange(alternateExchangeId);
+ Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId);
if (altExchange == null)
{
_logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
@@ -146,12 +154,11 @@ public class VirtualHostConfigRecoveryHa
try
{
Exchange exchange;
- AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
- exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
+ exchange = _exchangeRegistry.getExchange(exchangeName);
if (exchange == null)
{
- exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete);
+ _exchangeRegistry.registerExchange(exchange);
}
}
catch (AMQException e)
@@ -352,7 +359,7 @@ public class VirtualHostConfigRecoveryHa
{
try
{
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId);
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
if (exchange == null)
{
_logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java?rev=1500047&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java Fri Jul 5 15:30:53 2013
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface VirtualHostListener
+{
+
+ public void queueRegistered(AMQQueue queue);
+
+ public void queueUnregistered(AMQQueue queue);
+
+ public void connectionRegistered(AMQConnectionModel connection);
+
+ public void connectionUnregistered(AMQConnectionModel connection);
+
+ public void exchangeRegistered(Exchange exchange);
+
+ public void exchangeUnregistered(Exchange exchange);
+}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java Fri Jul 5 15:30:53 2013
@@ -179,8 +179,8 @@ public class DefaultExchangeFactoryTest
}
@Override
- public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket,
- boolean autoDelete) throws AMQException
+ public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable,
+ boolean autoDelete) throws AMQException
{
return null;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Fri Jul 5 15:30:53 2013
@@ -64,7 +64,7 @@ public class FanoutExchangeTest extends
when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
- _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false);
}
public void testIsBoundAMQShortStringFieldTableAMQQueueWhenQueueIsNull()
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Fri Jul 5 15:30:53 2013
@@ -68,7 +68,7 @@ public class HeadersExchangeTest extends
when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
- _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false);
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java Fri Jul 5 15:30:53 2013
@@ -45,7 +45,7 @@ public class BindingLogSubjectTest exten
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = new AMQShortString("RoutingKey");
- _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
+ _exchange = _testVhost.getExchange("amq.direct");
_queue = new MockAMQQueue("BindingLogSubjectTest");
((MockAMQQueue) _queue).setVirtualHost(_testVhost);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java Fri Jul 5 15:30:53 2013
@@ -40,7 +40,7 @@ public class ExchangeLogSubjectTest exte
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
+ _exchange = _testVhost.getExchange("amq.direct");
_subject = new ExchangeLogSubject(_exchange,_testVhost);
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Fri Jul 5 15:30:53 2013
@@ -142,25 +142,24 @@ public class AMQQueueFactoryTest extends
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
String queueName = "testDeadLetterQueueEnabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, FieldTable.convertToMap(fieldTable));
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName.asString(), altExchange.getName());
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
- assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = qReg.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
@@ -180,14 +179,13 @@ public class AMQQueueFactoryTest extends
public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
{
String queueName = "testDeadLetterQueueEnabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, null);
@@ -195,11 +193,11 @@ public class AMQQueueFactoryTest extends
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName.toString(), altExchange.getName());
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
- assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = qReg.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
@@ -222,20 +220,19 @@ public class AMQQueueFactoryTest extends
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
String queueName = "testDeadLetterQueueDisabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, FieldTable.convertToMap(fieldTable));
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
- assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
@@ -255,14 +252,13 @@ public class AMQQueueFactoryTest extends
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
//create an autodelete queue
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
@@ -271,7 +267,7 @@ public class AMQQueueFactoryTest extends
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
- assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName));
assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
//only 1 queue should have been registered
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Jul 5 15:30:53 2013
@@ -112,7 +112,7 @@ public class SimpleAMQQueueTest extends
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
false, false, _virtualHost, FieldTable.convertToMap(_arguments));
- _exchange = (DirectExchange) _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString());
}
@Override
@@ -423,7 +423,7 @@ public class SimpleAMQQueueTest extends
assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
- queueEntries.get(0).release();
+ queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Fri Jul 5 15:30:53 2013
@@ -21,6 +21,8 @@
package org.apache.qpid.server.store;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -78,9 +80,9 @@ public class MessageStoreTest extends Qp
public static final String SELECTOR_VALUE = "Test = 'MST'";
public static final String LVQ_KEY = "MST-LVQ-KEY";
- private AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
- private AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
- private AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
+ private String nonDurableExchangeName = "MST-NonDurableDirectExchange";
+ private String directExchangeName = "MST-DirectExchange";
+ private String topicExchangeName = "MST-TopicExchange";
private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
@@ -365,12 +367,12 @@ public class MessageStoreTest extends Qp
*/
public void testExchangePersistence() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
- Map<AMQShortString, Exchange> oldExchanges = createExchanges();
+ Map<String, Exchange> oldExchanges = createExchanges();
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
+ origExchangeCount + 3, getVirtualHost().getExchanges().size());
reloadVirtualHost();
@@ -385,31 +387,28 @@ public class MessageStoreTest extends Qp
*/
public void testDurableExchangeRemoval() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
createExchange(DirectExchange.TYPE, directExchangeName, true);
- ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount + 1, getVirtualHost().getExchanges().size());
reloadVirtualHost();
- exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after first recovery",
- origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount + 1, getVirtualHost().getExchanges().size());
//test that removing the exchange means it is not recovered next time
- final Exchange exchange = exchangeRegistry.getExchange(directExchangeName);
+ final Exchange exchange = getVirtualHost().getExchange(directExchangeName);
DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
reloadVirtualHost();
- exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after second recovery",
- origExchangeCount, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount, getVirtualHost().getExchanges().size());
assertNull("Durable exchange was not removed:" + directExchangeName,
- exchangeRegistry.getExchange(directExchangeName));
+ getVirtualHost().getExchange(directExchangeName));
}
/**
@@ -420,12 +419,12 @@ public class MessageStoreTest extends Qp
*/
public void testBindingPersistence() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
createAllQueues();
createAllTopicQueues();
- Map<AMQShortString, Exchange> exchanges = createExchanges();
+ Map<String, Exchange> exchanges = createExchanges();
Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName);
Exchange directExchange = exchanges.get(directExchangeName);
@@ -436,7 +435,7 @@ public class MessageStoreTest extends Qp
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
+ origExchangeCount + 3, getVirtualHost().getExchanges().size());
reloadVirtualHost();
@@ -469,8 +468,7 @@ public class MessageStoreTest extends Qp
assertEquals("Incorrect number of bindings registered after first recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
- ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
- exch = exchangeRegistry.getExchange(directExchangeName);
+ exch = getVirtualHost().getExchange(directExchangeName);
assertNotNull("Exchange was not recovered", exch);
//remove the binding and verify result after recovery
@@ -488,26 +486,30 @@ public class MessageStoreTest extends Qp
* and that the new exchanges are not the same objects as the provided list (i.e. that the
* reload actually generated new exchange objects)
*/
- private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges)
+ private void validateExchanges(int originalNumExchanges, Map<String, Exchange> oldExchanges)
{
- ExchangeRegistry registry = getVirtualHost().getExchangeRegistry();
-
+ Collection<Exchange> exchanges = getVirtualHost().getExchanges();
+ Collection<String> exchangeNames = new ArrayList(exchanges.size());
+ for(Exchange exchange : exchanges)
+ {
+ exchangeNames.add(exchange.getName());
+ }
assertTrue(directExchangeName + " exchange NOT reloaded",
- registry.getExchangeNames().contains(directExchangeName));
+ exchangeNames.contains(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
- registry.getExchangeNames().contains(topicExchangeName));
+ exchangeNames.contains(topicExchangeName));
assertTrue(nonDurableExchangeName + " exchange reloaded",
- !registry.getExchangeNames().contains(nonDurableExchangeName));
+ !exchangeNames.contains(nonDurableExchangeName));
//check the old exchange objects are not the same as the new exchanges
assertTrue(directExchangeName + " exchange NOT reloaded",
- registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
+ getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
- registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
+ getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
// There should only be the original exchanges + our 2 recovered durable exchanges
assertEquals("Incorrect number of exchanges available",
- originalNumExchanges + 2, registry.getExchangeNames().size());
+ originalNumExchanges + 2, getVirtualHost().getExchanges().size());
}
/** Validates the Durable queues and their properties are as expected following recovery */
@@ -771,9 +773,9 @@ public class MessageStoreTest extends Qp
}
- private Map<AMQShortString, Exchange> createExchanges()
+ private Map<String, Exchange> createExchanges()
{
- Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>();
+ Map<String, Exchange> exchanges = new HashMap<String, Exchange>();
//Register non-durable DirectExchange
exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
@@ -785,32 +787,19 @@ public class MessageStoreTest extends Qp
return exchanges;
}
- private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable)
+ private Exchange createExchange(ExchangeType<?> type, String name, boolean durable)
{
Exchange exchange = null;
try
{
- exchange = type.newInstance(UUIDGenerator.generateRandomUUID(), getVirtualHost(), name, durable, 0, false);
+ exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null);
}
catch (AMQException e)
{
fail(e.getMessage());
}
- try
- {
- getVirtualHost().getExchangeRegistry().registerExchange(exchange);
- if (durable)
- {
- DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(),
- exchange);
- }
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
return exchange;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Fri Jul 5 15:30:53 2013
@@ -190,7 +190,7 @@ public class BrokerTestHelper
when(info.getExchange()).thenReturn(exchangeNameAsShortString);
when(info.getRoutingKey()).thenReturn(rouningKey);
- Exchange exchange = channel.getVirtualHost().getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = channel.getVirtualHost().getExchange(exchangeName);
for (int count = 0; count < numberOfMessages; count++)
{
channel.setPublishFrame(info, exchange);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Fri Jul 5 15:30:53 2013
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -77,16 +81,6 @@ public class MockVirtualHost implements
return null;
}
- public ExchangeFactory getExchangeFactory()
- {
- return null;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return null;
- }
-
public int getHouseKeepingActiveCount()
{
return 0;
@@ -127,11 +121,56 @@ public class MockVirtualHost implements
return null;
}
+ @Override
+ public Exchange createExchange(UUID id,
+ String exchange,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchange) throws AMQException
+ {
+ return null;
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange, boolean force) throws AMQException
+ {
+ }
+
+ @Override
+ public Exchange getExchange(String name)
+ {
+ return null;
+ }
+
+ @Override
+ public Exchange getDefaultExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<Exchange> getExchanges()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<ExchangeType<? extends Exchange>> getExchangeTypes()
+ {
+ return null;
+ }
+
public SecurityManager getSecurityManager()
{
return null;
}
+ @Override
+ public void addVirtualHostListener(VirtualHostListener listener)
+ {
+ }
+
public LinkRegistry getLinkRegistry(String remoteContainerId)
{
return null;
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java Fri Jul 5 15:30:53 2013
@@ -200,7 +200,7 @@ public class StandardVirtualHostTest ext
File config = writeConfigFile(vhostName, queueName, exchangeName, false, new String[]{"ping","pong"}, bindingArguments);
VirtualHost vhost = createVirtualHost(vhostName, config);
- Exchange exch = vhost.getExchangeRegistry().getExchange(getName() +".direct");
+ Exchange exch = vhost.getExchange(getName() +".direct");
Collection<Binding> bindings = exch.getBindings();
assertNotNull("Bindings cannot be null", bindings);
assertEquals("Unexpected number of bindings", 3, bindings.size());
@@ -245,10 +245,10 @@ public class StandardVirtualHostTest ext
AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
assertNotNull("queue should exist", queue);
- Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange();
+ Exchange defaultExch = vhost.getDefaultExchange();
assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue));
- Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exch = vhost.getExchange(exchangeName);
assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue));
for(String key: routingKeys)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org