You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/05 17:30:54 UTC
svn commit: r1500047 [1/2] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/a...
Author: rgodfrey
Date: Fri Jul 5 15:30:53 2013
New Revision: 1500047
URL: http://svn.apache.org/r1500047
Log:
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore interface to be in terms of ConfiguredObject rather than implementation classes
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Fri Jul 5 15:30:53 2013
@@ -70,7 +70,7 @@ public class BDBHAVirtualHost extends Ab
_messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
_messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
_messageStore.configureConfigStore(getName(),
recoveryHandler,
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Jul 5 15:30:53 2013
@@ -63,7 +63,6 @@ public abstract class AbstractExchange i
private Exchange _alternateExchange;
private boolean _durable;
- private int _ticket;
private VirtualHost _virtualHost;
@@ -109,14 +108,17 @@ public abstract class AbstractExchange i
return _type.getName();
}
- public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ public void initialise(UUID id,
+ VirtualHost host,
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete)
throws AMQException
{
_virtualHost = host;
_name = name;
_durable = durable;
_autoDelete = autoDelete;
- _ticket = ticket;
_id = id;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
@@ -135,11 +137,6 @@ public abstract class AbstractExchange i
return _autoDelete;
}
- public int getTicket()
- {
- return _ticket;
- }
-
public void close() throws AMQException
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Fri Jul 5 15:30:53 2013
@@ -49,7 +49,6 @@ public class DefaultExchange implements
private UUID _id;
private VirtualHost _virtualHost;
- private int _ticket;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
private final AtomicBoolean _closed = new AtomicBoolean();
@@ -62,12 +61,10 @@ public class DefaultExchange implements
VirtualHost host,
AMQShortString name,
boolean durable,
- int ticket,
boolean autoDelete) throws AMQException
{
_id = id;
_virtualHost = host;
- _ticket = ticket;
}
@Override
@@ -197,12 +194,6 @@ public class DefaultExchange implements
}
@Override
- public int getTicket()
- {
- return _ticket;
- }
-
- @Override
public void close() throws AMQException
{
if(_closed.compareAndSet(false,true))
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Fri Jul 5 15:30:53 2013
@@ -102,51 +102,45 @@ public class DefaultExchangeFactory impl
public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
{
- Collection<ExchangeType<? extends Exchange>> publicTypes =
+ Collection<ExchangeType<? extends Exchange>> publicTypes =
new ArrayList<ExchangeType<? extends Exchange>>();
publicTypes.addAll(_exchangeClassMap.values());
-
+
return publicTypes;
}
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQException
+ throws AMQException
{
- return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
- }
- public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQException
- {
- return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
+ UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName());
+ return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
}
- public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
+ public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
{
- UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName());
- return createExchange(id, exchange, type, durable, autoDelete, ticket);
+ return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
}
- public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
+ private Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
+ boolean autoDelete)
throws AMQException
{
// Check access
if (!_host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type))
{
- String description = "Permission denied: exchange-name '" + exchange.asString() + "'";
+ String description = "Permission denied: exchange-name '" + exchange + "'";
throw new AMQSecurityException(description);
}
-
+
ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
if (exchType == null)
{
throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
-
- Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete);
+
+ Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete);
return e;
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Jul 5 15:30:53 2013
@@ -24,12 +24,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -46,8 +44,7 @@ public class DefaultExchangeRegistry imp
/**
* Maps from exchange name to exchange instance
*/
- private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
- private ConcurrentMap<String, Exchange> _exchangeMapStr = new ConcurrentHashMap<String, Exchange>();
+ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
private VirtualHost _host;
@@ -59,17 +56,17 @@ public class DefaultExchangeRegistry imp
_host = host;
}
- public void initialise() throws AMQException
+ public void initialise(ExchangeFactory exchangeFactory) throws AMQException
{
//create 'standard' exchanges:
- new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
+ new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
_defaultExchange = new DefaultExchange();
UUID defaultExchangeId =
UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
- _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false,0,false);
+ _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false);
}
@@ -80,8 +77,7 @@ public class DefaultExchangeRegistry imp
public void registerExchange(Exchange exchange) throws AMQException
{
- _exchangeMap.put(exchange.getNameShortString(), exchange);
- _exchangeMapStr.put(exchange.getNameShortString().toString(), exchange);
+ _exchangeMap.put(exchange.getNameShortString().toString(), exchange);
synchronized (_listeners)
{
for(RegistryChangeListener listener : _listeners)
@@ -102,12 +98,7 @@ public class DefaultExchangeRegistry imp
return _defaultExchange;
}
- public Collection<AMQShortString> getExchangeNames()
- {
- return _exchangeMap.keySet();
- }
-
- public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
+ public void unregisterExchange(String name, boolean inUse) throws AMQException
{
final Exchange exchange = _exchangeMap.get(name);
if (exchange == null)
@@ -123,13 +114,8 @@ public class DefaultExchangeRegistry imp
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
- _exchangeMapStr.remove(name.toString());
if (e != null)
{
- if (e.isDurable())
- {
- DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e);
- }
e.close();
synchronized (_listeners)
@@ -147,11 +133,6 @@ public class DefaultExchangeRegistry imp
}
}
- public void unregisterExchange(String name, boolean inUse) throws AMQException
- {
- unregisterExchange(new AMQShortString(name), inUse);
- }
-
public Collection<Exchange> getExchanges()
{
return new ArrayList<Exchange>(_exchangeMap.values());
@@ -162,19 +143,6 @@ public class DefaultExchangeRegistry imp
_listeners.add(listener);
}
- public Exchange getExchange(AMQShortString name)
- {
- if ((name == null) || name.length() == 0)
- {
- return getDefaultExchange();
- }
- else
- {
- return _exchangeMap.get(name);
- }
-
- }
-
public Exchange getExchange(String name)
{
if ((name == null) || name.length() == 0)
@@ -183,17 +151,15 @@ public class DefaultExchangeRegistry imp
}
else
{
- return _exchangeMapStr.get(name);
+ return _exchangeMap.get(name);
}
}
@Override
public void clearAndUnregisterMbeans()
{
- for (final AMQShortString exchangeName : getExchangeNames())
+ for (final Exchange exchange : getExchanges())
{
- final Exchange exchange = getExchange(exchangeName);
-
//TODO: this is a bit of a hack, what if the listeners aren't aware
//that we are just unregistering the MBean because of HA, and aren't
//actually removing the exchange as such.
@@ -206,7 +172,6 @@ public class DefaultExchangeRegistry imp
}
}
_exchangeMap.clear();
- _exchangeMapStr.clear();
}
@Override
@@ -237,7 +202,7 @@ public class DefaultExchangeRegistry imp
{
return true;
}
- Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeFactory().getRegisteredTypes();
+ Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeTypes();
for (ExchangeType<? extends Exchange> type : registeredTypes)
{
if (type.getDefaultExchangeName().toString().equals(name))
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java Fri Jul 5 15:30:53 2013
@@ -36,13 +36,12 @@ public class DirectExchangeType implemen
}
public DirectExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete) throws AMQException
{
DirectExchange exch = new DirectExchange();
- exch.initialise(id, host,name,durable,ticket,autoDelete);
+ exch.initialise(id, host,name,durable, autoDelete);
return exch;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Jul 5 15:30:53 2013
@@ -39,7 +39,7 @@ import java.util.UUID;
public interface Exchange extends ExchangeReferrer
{
- void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -60,8 +60,6 @@ public interface Exchange extends Exchan
*/
boolean isAutoDelete();
- int getTicket();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Fri Jul 5 15:30:53 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,19 +30,13 @@ import java.util.UUID;
public interface ExchangeFactory
{
- Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
- int ticket)
- throws AMQException;
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
-
+
Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
- Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
- throws AMQException;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Fri Jul 5 15:30:53 2013
@@ -33,17 +33,17 @@ public class ExchangeInitialiser
{
for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
{
- define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
+ define (registry, factory, type.getDefaultExchangeName().toString(), type.getName().toString(), store);
}
}
private void define(ExchangeRegistry r, ExchangeFactory f,
- AMQShortString name, AMQShortString type, DurableConfigurationStore store) throws AMQException
+ String name, String type, DurableConfigurationStore store) throws AMQException
{
if(r.getExchange(name)== null)
{
- Exchange exchange = f.createExchange(name, type, true, false, 0);
+ Exchange exchange = f.createExchange(name, type, true, false);
r.registerExchange(exchange);
if(exchange.isDurable())
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Fri Jul 5 15:30:53 2013
@@ -31,27 +31,19 @@ public interface ExchangeRegistry
{
void registerExchange(Exchange exchange) throws AMQException;
- /**
- * Unregister an exchange
- * @param name name of the exchange to delete
- * @param inUse if true, do NOT delete the exchange if it is in use (has queues bound to it)
- * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use
- * @throws AMQException
- */
- void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException;
-
- Exchange getExchange(AMQShortString name);
-
- void setDefaultExchange(Exchange exchange);
-
Exchange getDefaultExchange();
- Collection<AMQShortString> getExchangeNames();
-
- void initialise() throws AMQException;
+ void initialise(ExchangeFactory exchangeFactory) throws AMQException;
Exchange getExchange(String exchangeName);
+ /**
+ * Unregister an exchange
+ * @param exchange name of the exchange to delete
+ * @param ifUnused if true, do NOT delete the exchange if it is in use (has queues bound to it)
+ * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use
+ * @throws AMQException
+ */
void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;
void clearAndUnregisterMbeans();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java Fri Jul 5 15:30:53 2013
@@ -36,11 +36,11 @@ public class FanoutExchangeType implemen
}
public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name,
- boolean durable, int ticket, boolean autoDelete)
+ boolean durable, boolean autoDelete)
throws AMQException
{
FanoutExchange exch = new FanoutExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java Fri Jul 5 15:30:53 2013
@@ -35,12 +35,12 @@ public class HeadersExchangeType impleme
return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
}
- public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket,
- boolean autoDelete) throws AMQException
+ public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable,
+ boolean autoDelete) throws AMQException
{
HeadersExchange exch = new HeadersExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java Fri Jul 5 15:30:53 2013
@@ -36,13 +36,12 @@ public class TopicExchangeType implement
}
public TopicExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete) throws AMQException
{
TopicExchange exch = new TopicExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Fri Jul 5 15:30:53 2013
@@ -67,7 +67,7 @@ public class BasicPublishMethodHandler i
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exch = vHost.getExchange(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Jul 5 15:30:53 2013
@@ -88,7 +88,7 @@ public class ExchangeBoundHandler implem
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = virtualHost.getExchange(exchangeName.toString());
ExchangeBoundOkBody response;
if (exchange == null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Jul 5 15:30:53 2013
@@ -32,12 +32,11 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -59,8 +58,6 @@ public class ExchangeDeclareHandler impl
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
final AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
@@ -73,57 +70,63 @@ public class ExchangeDeclareHandler impl
_logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
}
- synchronized(exchangeRegistry)
+ Exchange exchange;
+
+ if (body.getPassive())
{
- Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!exchange.getTypeShortString().equals(body.getType()) && !(body.getType() == null || body.getType().length() ==0))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getTypeShortString()
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ }
- if (exchange == null)
+ }
+ else
+ {
+ try
{
- if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
- }
- else if(exchangeName.startsWith("amq."))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'amq.'.");
- }
- else if(exchangeName.startsWith("qpid."))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'qpid.'.");
- }
- else
+ exchange = virtualHost.createExchange(null,
+ exchangeName == null ? null : exchangeName.intern().toString(),
+ body.getType() == null ? null : body.getType().intern().toString(),
+ body.getDurable(),
+ body.getAutoDelete(),
+ null);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.");
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!exchange.getTypeShortString().equals(body.getType()))
{
- try
- {
- exchange = exchangeFactory.createExchange(exchangeName == null ? null : exchangeName.intern(),
- body.getType() == null ? null : body.getType().intern(),
- body.getDurable(),
- body.getAutoDelete(), body.getTicket());
- exchangeRegistry.registerExchange(exchange);
-
- if (exchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
- exchange);
- }
- }
- catch(AMQUnknownExchangeType e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
- }
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getTypeShortString()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
}
}
- else if (!exchange.getTypeShortString().equals(body.getType()) && !((body.getType() == null || body.getType().length() ==0) && body.getPassive()))
+ catch(AMQUnknownExchangeType e)
{
-
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
- exchangeName + " of type " + exchange.getTypeShortString() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
}
}
+
+
if(!body.getNowait())
{
MethodRegistry methodRegistry = session.getMethodRegistry();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Jul 5 15:30:53 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,16 +21,21 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.ExecutionErrorCode;
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
@@ -49,7 +54,6 @@ public class ExchangeDeleteHandler imple
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
final AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
@@ -58,14 +62,18 @@ public class ExchangeDeleteHandler imple
channel.sync();
try
{
- if(exchangeRegistry.getExchange(body.getExchange()) == null)
+ final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+
+ final Exchange exchange = virtualHost.getExchange(exchangeName);
+ if(exchange == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
}
- exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+ virtualHost.removeExchange(exchange, !body.getIfUnused());
ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
-
+
session.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeInUseException e)
@@ -73,5 +81,15 @@ public class ExchangeDeleteHandler imple
throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
// TODO: sort out consistent channel close mechanism that does all clean up etc.
}
+
+ catch (ExchangeIsAlternateException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Jul 5 15:30:53 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -32,7 +33,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -62,7 +62,6 @@ public class QueueBindHandler implements
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
@@ -103,10 +102,11 @@ public class QueueBindHandler implements
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+ final Exchange exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.");
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Jul 5 15:30:53 2013
@@ -62,7 +62,6 @@ public class QueueDeclareHandler impleme
final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
final AMQSessionModel session = protocolConnection.getChannel(channelId);
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Fri Jul 5 15:30:53 2013
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.server.handler;
@@ -60,7 +60,6 @@ public class QueueUnbindHandler implemen
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -97,7 +96,7 @@ public class QueueUnbindHandler implemen
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ final Exchange exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
@@ -112,7 +111,7 @@ public class QueueUnbindHandler implemen
exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
}
-
+
if (_log.isInfoEnabled())
{
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Fri Jul 5 15:30:53 2013
@@ -42,6 +42,8 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
@@ -121,7 +123,7 @@ final class ExchangeAdapter extends Abst
return createBinding(bindingKey, queue, bindingArgs, attributes);
}
-
+
public org.apache.qpid.server.model.Binding createBinding(String bindingKey, Queue queue,
Map<String, Object> bindingArguments,
Map<String, Object> attributes)
@@ -165,21 +167,11 @@ final class ExchangeAdapter extends Abst
{
try
{
- ExchangeRegistry exchangeRegistry = _vhost.getVirtualHost().getExchangeRegistry();
- if (exchangeRegistry.isReservedExchangeName(getName()))
- {
- throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted");
- }
-
- if(_exchange.hasReferrers())
- {
- throw new AMQException( AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", null);
- }
-
- synchronized(exchangeRegistry)
- {
- exchangeRegistry.unregisterExchange(getName(), false);
- }
+ _vhost.getVirtualHost().removeExchange(_exchange, true);
+ }
+ catch(RequiredExchangeException e)
+ {
+ throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted");
}
catch(AMQException e)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Jul 5 15:30:53 2013
@@ -81,11 +81,13 @@ import org.apache.qpid.server.txn.LocalT
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.plugin.VirtualHostFactory;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
- QueueRegistry.RegistryChangeListener,
- IConnectionRegistry.RegistryChangeListener
+public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener
{
private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class);
@@ -184,7 +186,7 @@ public final class VirtualHostAdapter ex
private void populateExchanges()
{
Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges =
- _virtualHost.getExchangeRegistry().getExchanges();
+ _virtualHost.getExchanges();
synchronized (_exchangeAdapters)
{
@@ -296,31 +298,81 @@ public final class VirtualHostAdapter ex
try
{
- ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry();
- if (exchangeRegistry.isReservedExchangeName(name))
+ String alternateExchange = null;
+ if(attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
{
- throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name");
- }
- synchronized(exchangeRegistry)
- {
- org.apache.qpid.server.exchange.Exchange exchange = exchangeRegistry.getExchange(name);
- if (exchange != null)
+ Object altExchangeObject = attributes.get(Exchange.ALTERNATE_EXCHANGE);
+ if(altExchangeObject instanceof Exchange)
{
- throw new IllegalArgumentException("Exchange with name '" + name + "' already exists");
+ alternateExchange = ((Exchange) altExchangeObject).getName();
}
- exchange = _virtualHost.getExchangeFactory().createExchange(name, type, durable,
- lifetime == LifetimePolicy.AUTO_DELETE);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- if(durable)
+ else if(altExchangeObject instanceof UUID)
{
- DurableConfigurationStoreHelper.createExchange(_virtualHost.getDurableConfigurationStore(),
- exchange);
+ for(Exchange ex : getExchanges())
+ {
+ if(altExchangeObject.equals(ex.getId()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
}
- synchronized (_exchangeAdapters)
+ else if(altExchangeObject instanceof String)
{
- return _exchangeAdapters.get(exchange);
+
+ for(Exchange ex : getExchanges())
+ {
+ if(altExchangeObject.equals(ex.getName()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
+ if(alternateExchange == null)
+ {
+ try
+ {
+ UUID id = UUID.fromString(altExchangeObject.toString());
+ for(Exchange ex : getExchanges())
+ {
+ if(id.equals(ex.getId()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore
+ }
+
+ }
}
}
+ org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(null,
+ name,
+ type,
+ durable,
+ lifetime == LifetimePolicy.AUTO_DELETE,
+ alternateExchange);
+ synchronized (_exchangeAdapters)
+ {
+ return _exchangeAdapters.get(exchange);
+ }
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ throw new IllegalArgumentException("Exchange with name '" + name + "' already exists");
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name");
+ }
+ catch(UnknownExchangeException e)
+ {
+ throw new IllegalArgumentException("Alternate Exchange with name '" + e.getExchangeName() + "' does not exist");
}
catch(AMQException e)
{
@@ -726,7 +778,7 @@ public final class VirtualHostAdapter ex
public Collection<String> getExchangeTypes()
{
Collection<ExchangeType<? extends org.apache.qpid.server.exchange.Exchange>> types =
- _virtualHost.getExchangeFactory().getRegisteredTypes();
+ _virtualHost.getExchangeTypes();
Collection<String> exchangeTypes = new ArrayList<String>();
@@ -884,7 +936,7 @@ public final class VirtualHostAdapter ex
if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
List<String> types = new ArrayList<String>();
- for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes())
+ for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeTypes())
{
types.add(type.getName().asString());
}
@@ -1009,7 +1061,7 @@ public final class VirtualHostAdapter ex
}
else if(VirtualHost.EXCHANGE_COUNT.equals(name))
{
- return _vhost.getExchangeRegistry().getExchanges().size();
+ return _vhost.getExchanges().size();
}
else if(VirtualHost.CONNECTION_COUNT.equals(name))
{
@@ -1127,11 +1179,9 @@ public final class VirtualHostAdapter ex
virtualHostRegistry.registerVirtualHost(_virtualHost);
_statistics = new VirtualHostStatisticsAdapter(_virtualHost);
- _virtualHost.getQueueRegistry().addRegistryChangeListener(this);
+ _virtualHost.addVirtualHostListener(this);
populateQueues();
- _virtualHost.getExchangeRegistry().addRegistryChangeListener(this);
populateExchanges();
- _virtualHost.getConnectionRegistry().addRegistryChangeListener(this);
synchronized(_aliases)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Fri Jul 5 15:30:53 2013
@@ -102,7 +102,7 @@ public class HeaderPropertiesConverter
exchangeName = "";
}
- Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = vhost.getExchange(exchangeName);
String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java Fri Jul 5 15:30:53 2013
@@ -31,6 +31,6 @@ public interface ExchangeType<T extends
{
public AMQShortString getName();
public T newInstance(UUID id, VirtualHost host, AMQShortString name,
- boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ boolean durable, boolean autoDelete) throws AMQException;
public AMQShortString getDefaultExchangeName();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Jul 5 15:30:53 2013
@@ -215,7 +215,7 @@ public class SendingLink_1_0 implements
List<Binding> bindingsToRemove = new ArrayList<Binding>();
for(Binding existingBinding : bindings)
{
- if(existingBinding.getExchange() != _vhost.getExchangeRegistry().getDefaultExchange()
+ if(existingBinding.getExchange() != _vhost.getDefaultExchange()
&& existingBinding.getExchange() != exchange)
{
bindingsToRemove.add(existingBinding);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Jul 5 15:30:53 2013
@@ -115,7 +115,7 @@ public class Session_1_0 implements Sess
}
else
{
- Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ Exchange exchg = _vhost.getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
@@ -244,7 +244,7 @@ public class Session_1_0 implements Sess
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ Exchange exchg = _vhost.getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, target.getDurable(),
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Jul 5 15:30:53 2013
@@ -38,6 +38,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
@@ -300,25 +301,22 @@ public class AMQQueueFactory
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
Exchange dlExchange = null;
- synchronized(exchangeRegistry)
- {
- dlExchange = exchangeRegistry.getExchange(dlExchangeName);
-
- if(dlExchange == null)
- {
- dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
-
- exchangeRegistry.registerExchange(dlExchange);
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName());
- //enter the dle in the persistent store
- DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
- dlExchange);
- }
+ try
+ {
+ dlExchange = virtualHost.createExchange(dlExchangeId,
+ dlExchangeName,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
+ true, false, null);
+ }
+ catch(ExchangeExistsException e)
+ {
+ // We're ok if the exchange already exists
+ dlExchange = e.getExistingExchange();
}
AMQQueue dlQueue = null;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1500047&r1=1500046&r2=1500047&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri Jul 5 15:30:53 2013
@@ -63,6 +63,11 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -185,7 +190,7 @@ public class ServerSessionDelegate exten
if(!method.hasQueue())
{
- exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
}
else
{
@@ -684,7 +689,6 @@ public class ServerSessionDelegate exten
{
String exchangeName = method.getExchange();
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
//we must check for any unsupported arguments present and throw not-implemented
if(method.hasArguments())
@@ -697,114 +701,79 @@ public class ServerSessionDelegate exten
return;
}
}
- synchronized(exchangeRegistry)
+
+ if(method.getPassive())
{
Exchange exchange = getExchange(session, exchangeName);
- if(method.getPassive())
+ if(exchange == null)
{
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
- }
- else
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
+ }
+ else
+ {
+ if (!exchange.getTypeShortString().toString().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
{
- if (!exchange.getTypeShortString().toString().equals(method.getType())
- && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
}
}
- else
+ }
+ else
+ {
+
+ try
{
- if (exchange == null)
+ virtualHost.createExchange(null,
+ method.getExchange(),
+ method.getType(),
+ method.getDurable(),
+ method.getAutoDelete(),
+ method.getAlternateExchange());
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved name or prefix.");
+ }
+ catch(UnknownExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + e.getExchangeName());
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ }
+ catch(ExchangeExistsException e)
+ {
+ Exchange exchange = e.getExistingExchange();
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
- if (exchangeName.startsWith("amq."))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved prefix 'amq.'.");
- }
- else if (exchangeName.startsWith("qpid."))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved prefix 'qpid.'.");
- }
- else
- {
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
- try
- {
- exchange = exchangeFactory.createExchange(method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete());
- String alternateExchangeName = method.getAlternateExchange();
- boolean validAlternate;
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
- {
- Exchange alternate = getExchange(session, alternateExchangeName);
- if(alternate == null)
- {
- validAlternate = false;
- }
- else
- {
- exchange.setAlternateExchange(alternate);
- validAlternate = true;
- }
- }
- else
- {
- validAlternate = true;
- }
- if(validAlternate)
- {
- if (exchange.isDurable())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.createExchange(store, exchange);
- }
- exchangeRegistry.registerExchange(exchange);
- }
- else
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + alternateExchangeName);
- }
- }
- catch(AMQUnknownExchangeType e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
- }
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeShortString()
+ + " to " + method.getType() +".");
}
- else
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
{
- if(!exchange.getTypeShortString().toString().equals(method.getType()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeShortString()
- + " to " + method.getType() +".");
- }
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
}
}
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+ }
+
+
}
+
}
// TODO decouple AMQException and AMQConstant error codes
@@ -841,32 +810,25 @@ public class ServerSessionDelegate exten
private Exchange getExchange(Session session, String exchangeName)
{
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
- return exchangeRegistry.getExchange(exchangeName);
- }
-
- private ExchangeRegistry getExchangeRegistry(Session session)
- {
- VirtualHost virtualHost = getVirtualHost(session);
- return virtualHost.getExchangeRegistry();
-
+ return getVirtualHost(session).getExchange(exchangeName);
}
private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
{
- final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ VirtualHost virtualHost = getVirtualHost(ssn);
+
Exchange exchange;
if(xfr.hasDestination())
{
- exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ exchange = virtualHost.getExchange(xfr.getDestination());
if(exchange == null)
{
- exchange = exchangeRegistry.getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = exchangeRegistry.getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
return exchange;
}
@@ -888,7 +850,6 @@ public class ServerSessionDelegate exten
public void exchangeDelete(Session session, ExchangeDelete method)
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
try
{
@@ -904,29 +865,23 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'");
}
- else if(exchange.hasReferrers())
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
- }
- else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
- }
else
{
- exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
-
- if (exchange.isDurable() && !exchange.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeExchange(store, exchange);
- }
+ virtualHost.removeExchange(exchange, !method.getIfUnused());
}
}
catch (ExchangeInUseException e)
{
exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
}
+ catch (ExchangeIsAlternateException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ catch (RequiredExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
+ }
catch (AMQException e)
{
exception(session, method, e, "Cannot delete exchange '" + method.getExchange() );
@@ -982,7 +937,6 @@ public class ServerSessionDelegate exten
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
@@ -1002,7 +956,7 @@ public class ServerSessionDelegate exten
method.setBindingKey(method.getQueue());
}
AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
@@ -1045,7 +999,6 @@ public class ServerSessionDelegate exten
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
@@ -1063,7 +1016,7 @@ public class ServerSessionDelegate exten
else
{
AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
@@ -1091,11 +1044,12 @@ public class ServerSessionDelegate exten
{
ExchangeBoundResult result = new ExchangeBoundResult();
+ VirtualHost virtualHost = getVirtualHost(session);
Exchange exchange;
AMQQueue queue;
if(method.hasExchange())
{
- exchange = getExchange(session, method.getExchange());
+ exchange = virtualHost.getExchange(method.getExchange());
if(exchange == null)
{
@@ -1104,7 +1058,7 @@ public class ServerSessionDelegate exten
}
else
{
- exchange = getExchangeRegistry(session).getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org