You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/20 17:20:46 UTC
svn commit: r509628 [1/2] - in /incubator/qpid/trunk/qpid:
gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/configuration/
java/broker/src/main/java/org/apache/qpid/server/exchange/...
Author: rgodfrey
Date: Tue Feb 20 08:20:41 2007
New Revision: 509628
URL: http://svn.apache.org/viewvc?view=rev&rev=509628
Log:
QPID-325 : Persist durable exchange information in the store
QPID-318 : Remove hardcoding of version numbers (as applies to store)
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
Modified:
incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl Tue Feb 20 08:20:41 2007
@@ -48,6 +48,8 @@
static
{
%{CLIST} ${reg_map_put_method}
+
+ configure();
}
public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
@@ -126,5 +128,26 @@
}
+
+ private static void configure()
+ {
+ for(int i = 0 ; i < _specificRegistries.length; i++)
+ {
+ VersionSpecificRegistry[] registries = _specificRegistries[i];
+ if(registries != null)
+ {
+ for(int j = 0 ; j < registries.length; j++)
+ {
+ VersionSpecificRegistry registry = registries[j];
+
+ if(registry != null)
+ {
+ registry.configure();
+ }
+ }
+ }
+ }
+
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 08:20:41 2007
@@ -33,17 +33,16 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -202,9 +201,11 @@
}
- public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody,
+
+
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
// TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
@@ -252,7 +253,7 @@
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
+ if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Tue Feb 20 08:20:41 2007
@@ -198,11 +198,17 @@
for(Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- exchange.registerQueue(routingKey, queue, null);
+
+
+ queue.bind(routingKey, null, exchange);
- queue.bind(routingKey, exchange);
_logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+ }
+
+ if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+ {
+ queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Feb 20 08:20:41 2007
@@ -28,6 +28,8 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
@@ -39,23 +41,32 @@
private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
private Exchange _defaultExchange;
+ private VirtualHost _host;
- public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
+ public DefaultExchangeRegistry(VirtualHost host)
{
//create 'standard' exchanges:
- try
- {
- new ExchangeInitialiser().initialise(exchangeFactory, this);
- }
- catch(AMQException e)
- {
- _log.error("Failed to initialise exchanges: ", e);
- }
+ _host = host;
+
}
- public void registerExchange(Exchange exchange)
+ public void initialise() throws AMQException
+ {
+ new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _host.getMessageStore();
+ }
+
+ public void registerExchange(Exchange exchange) throws AMQException
{
_exchangeMap.put(exchange.getName(), exchange);
+ if(exchange.isDurable())
+ {
+ getMessageStore().createExchange(exchange);
+ }
}
public void setDefaultExchange(Exchange exchange)
@@ -74,6 +85,10 @@
Exchange e = _exchangeMap.remove(name);
if (e != null)
{
+ if(e.isDurable())
+ {
+ getMessageStore().removeExchange(e);
+ }
e.close();
}
else
@@ -102,7 +117,7 @@
*/
public void routeContent(AMQMessage payload) throws AMQException
{
- final AMQShortString exchange = payload.getPublishBody().exchange;
+ final AMQShortString exchange = payload.getMessagePublishInfo().getExchange();
final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
// the BasicPublish being received (where the exchange is validated) and the final
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Tue Feb 20 08:20:41 2007
@@ -43,6 +43,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -126,8 +127,7 @@
try
{
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), DestNameExchange.this);
+ queue.bind(new AMQShortString(binding), null, DestNameExchange.this);
}
catch (AMQException ex)
{
@@ -170,7 +170,7 @@
}
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -184,13 +184,13 @@
public void route(AMQMessage payload) throws AMQException
{
- final BasicPublishBody publishBody = payload.getPublishBody();
- final AMQShortString routingKey = publishBody.routingKey;
+ final MessagePublishInfo info = payload.getMessagePublishInfo();
+ final AMQShortString routingKey = info.getRoutingKey();
final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (publishBody.mandatory)
+ if (info.isMandatory())
{
throw new NoRouteException(msg, payload);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue Feb 20 08:20:41 2007
@@ -45,6 +45,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -125,8 +126,7 @@
try
{
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), DestWildExchange.this);
+ queue.bind(new AMQShortString(binding), null, DestWildExchange.this);
}
catch (AMQException ex)
{
@@ -168,9 +168,9 @@
public void route(AMQMessage payload) throws AMQException
{
- BasicPublishBody publishBody = payload.getPublishBody();
+ MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = publishBody.routingKey;
+ final AMQShortString routingKey = info.getRoutingKey();
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
@@ -221,7 +221,7 @@
return !_routingKey2queues.isEmpty();
}
- public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Feb 20 08:20:41 2007
@@ -47,7 +47,7 @@
void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+ void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
void route(AMQMessage message) throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Feb 20 08:20:41 2007
@@ -26,7 +26,7 @@
public interface ExchangeRegistry extends MessageRouter
{
- void registerExchange(Exchange exchange);
+ void registerExchange(Exchange exchange) throws AMQException;
/**
* Unregister an exchange
@@ -42,4 +42,6 @@
void setDefaultExchange(Exchange exchange);
Exchange getDefaultExchange();
+
+ void initialise() throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Feb 20 08:20:41 2007
@@ -19,8 +19,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -98,9 +98,8 @@
}
try
- {
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), FanoutExchange.this);
+ {
+ queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
{
@@ -144,10 +143,10 @@
}
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+
if (!_queues.remove(queue))
{
@@ -158,12 +157,12 @@
public void route(AMQMessage payload) throws AMQException
{
- final BasicPublishBody publishBody = payload.getPublishBody();
- final AMQShortString routingKey = publishBody.routingKey;
+ final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
+ final AMQShortString routingKey = publishInfo.getRoutingKey();
if (_queues == null || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishBody.mandatory)
+ if (publishInfo.isMandatory())
{
throw new NoRouteException(msg, payload);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Feb 20 08:20:41 2007
@@ -200,10 +200,10 @@
_bindings.add(new Registration(new HeadersBinding(args), queue));
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
_logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
- _bindings.remove(new Registration(null, queue));
+ _bindings.remove(new Registration(new HeadersBinding(args), queue));
}
public void route(AMQMessage payload) throws AMQException
@@ -232,7 +232,7 @@
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getPublishBody().mandatory)
+ if (payload.getMessagePublishInfo().isMandatory())
{
throw new NoRouteException(msg, payload);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Feb 20 08:20:41 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -85,7 +86,8 @@
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.setPublishFrame(body, session);
+ MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+ channel.setPublishFrame(info, session);
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Feb 20 08:20:41 2007
@@ -98,8 +98,8 @@
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
try
- {
- exch.registerQueue(body.routingKey, queue, body.arguments);
+ {
+ queue.bind(body.routingKey, body.arguments, exch);
}
catch (AMQInvalidRoutingKeyException rke)
{
@@ -109,7 +109,7 @@
{
throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
}
- queue.bind(body.routingKey, exch);
+
if (_log.isInfoEnabled())
{
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Feb 20 08:20:41 2007
@@ -108,8 +108,8 @@
if (autoRegister)
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- defaultExchange.registerQueue(body.queue, queue, null);
- queue.bind(body.queue, defaultExchange);
+
+ queue.bind(body.queue, null, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Tue Feb 20 08:20:41 2007
@@ -41,6 +41,9 @@
private void define(ExchangeRegistry r, ExchangeFactory f,
AMQShortString name, AMQShortString type) throws AMQException
{
- r.registerExchange(f.createExchange(name, type, true, false, 0));
+ if(r.getExchange(name)== null)
+ {
+ r.registerExchange(f.createExchange(name, type, true, false, 0));
+ }
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Feb 20 08:20:41 2007
@@ -27,20 +27,13 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicGetOkBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.SmallCompositeAMQDataBlock;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
/**
@@ -98,10 +91,12 @@
private int _channel;
private int _index = -1;
+ private AMQProtocolSession _protocolSession;
- private BodyFrameIterator(int channel)
+ private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
_channel = channel;
+ _protocolSession = protocolSession;
}
public boolean hasNext()
@@ -121,8 +116,9 @@
{
try
{
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
- return ContentBody.createAMQFrame(_channel, cb);
+
+ AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ return new AMQFrame(_channel, cb);
}
catch (AMQException e)
{
@@ -132,6 +128,11 @@
}
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getRegistry().getProtocolVersionMethodConverter();
+ }
+
public void remove()
{
throw new UnsupportedOperationException();
@@ -143,7 +144,7 @@
return _txnContext.getStoreContext();
}
- private class BodyContentIterator implements Iterator<ContentBody>
+ private class BodyContentIterator implements Iterator<ContentChunk>
{
private int _index = -1;
@@ -161,11 +162,11 @@
}
}
- public ContentBody next()
+ public ContentChunk next()
{
try
{
- return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
}
catch (AMQException e)
{
@@ -179,13 +180,13 @@
}
}
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
- _immediate = publishBody.immediate;
- _transientMessageData.setPublishBody(publishBody);
+ _immediate = info.isImmediate();
+ _transientMessageData.setMessagePublishInfo(info);
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
@@ -215,14 +216,14 @@
* Used in testing only. This allows the passing of the content header immediately
* on construction.
* @param messageId
- * @param publishBody
+ * @param info
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
{
- this(messageId, publishBody, txnContext);
+ this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
}
@@ -230,23 +231,23 @@
* Used in testing only. This allows the passing of the content header and some body fragments on
* construction.
* @param messageId
- * @param publishBody
+ * @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
* @throws AMQException
*/
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
+ List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
MessageHandleFactory messageHandleFactory) throws AMQException
{
- this(messageId, publishBody, txnContext, contentHeader);
+ this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
routingComplete(messageStore, storeContext, messageHandleFactory);
- for (ContentBody cb : contentBodies)
+ for (ContentChunk cb : contentBodies)
{
addContentBodyFrame(storeContext, cb);
}
@@ -261,12 +262,12 @@
_transientMessageData = msg._transientMessageData;
}
- public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
- return new BodyFrameIterator(channel);
+ return new BodyFrameIterator(protocolSession, channel);
}
- public Iterator<ContentBody> getContentBodyIterator()
+ public Iterator<ContentChunk> getContentBodyIterator()
{
return new BodyContentIterator();
}
@@ -311,11 +312,11 @@
}
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
{
- _transientMessageData.addBodyLength(contentBody.getSize());
+ _transientMessageData.addBodyLength(contentChunk.getSize());
final boolean allContentReceived = isAllContentReceived();
- _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived);
+ _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived);
if (allContentReceived)
{
deliver(storeContext);
@@ -502,16 +503,16 @@
}
}
- public BasicPublishBody getPublishBody() throws AMQException
+ public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
- BasicPublishBody pb;
+ MessagePublishInfo pb;
if (_transientMessageData != null)
{
- pb = _transientMessageData.getPublishBody();
+ pb = _transientMessageData.getMessagePublishInfo();
}
else
{
- pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
}
return pb;
}
@@ -554,7 +555,7 @@
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(),
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
_transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
@@ -598,9 +599,9 @@
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
- AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
protocolSession.writeFrame(compositeBlock);
@@ -610,8 +611,8 @@
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
- protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -641,9 +642,9 @@
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
- AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
protocolSession.writeFrame(compositeBlock);
@@ -653,8 +654,8 @@
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
- protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -667,10 +668,10 @@
private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- BasicPublishBody pb = getPublishBody();
+ MessagePublishInfo pb = getMessagePublishInfo();
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
- pb.routingKey);
+ deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
@@ -680,14 +681,14 @@
private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
- BasicPublishBody pb = getPublishBody();
+ MessagePublishInfo pb = getMessagePublishInfo();
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.exchange,
+ deliveryTag, pb.getExchange(),
queueSize,
_messageHandle.isRedelivered(),
- pb.routingKey);
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,9 +700,9 @@
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- getPublishBody().exchange,
+ getMessagePublishInfo().getExchange(),
replyCode, replyText,
- getPublishBody().routingKey);
+ getMessagePublishInfo().getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
returnFrame.writePayload(buf);
buf.flip();
@@ -716,7 +717,7 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
@@ -767,7 +768,7 @@
public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
- transientMessageData.setPublishBody(getPublishBody());
+ transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
_transientMessageData = transientMessageData;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
/**
* A pluggable way of getting message data. Implementations can provide intelligent caching for example or
@@ -53,11 +53,11 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
- void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
+ MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException;
boolean isRedelivered();
@@ -65,7 +65,7 @@
boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
- void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb 20 08:20:41 2007
@@ -441,9 +441,24 @@
return _deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(AMQShortString routingKey, Exchange exchange)
+ public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
- _bindings.addBinding(routingKey, exchange);
+ exchange.registerQueue(routingKey, this, arguments);
+ if(isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+ }
+ _bindings.addBinding(routingKey, arguments, exchange);
+ }
+
+ public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
+ {
+ exchange.deregisterQueue(routingKey, this, arguments);
+ if(isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+ }
+ _bindings.remove(routingKey, arguments, exchange);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Feb 20 08:20:41 2007
@@ -44,6 +44,7 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -322,16 +323,16 @@
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
- Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
+ Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
while (cBodies.hasNext())
{
- ContentBody body = cBodies.next();
+ ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
if (body.getSize() != 0)
{
- ByteBuffer slice = body.payload.slice();
+ ByteBuffer slice = body.getData().slice();
for (int j = 0; j < slice.limit(); j++)
{
msgContent.add(slice.get());
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 20 08:20:41 2007
@@ -33,7 +33,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -114,11 +114,11 @@
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- Iterator<ContentBody> it = msg.getContentBodyIterator();
+ Iterator<ContentChunk> it = msg.getContentBodyIterator();
while (it.hasNext())
{
- ContentBody cb = it.next();
- cb.reduceBufferToFit();
+ ContentChunk cb = it.next();
+ cb.reduceToFit();
}
}
@@ -493,7 +493,7 @@
{
_log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
}
- if (!msg.getPublishBody().immediate)
+ if (!msg.getMessagePublishInfo().isImmediate())
{
addMessageToQueue(msg);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Tue Feb 20 08:20:41 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
/**
@@ -35,42 +36,55 @@
*/
class ExchangeBindings
{
+ private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
+
static class ExchangeBinding
{
- private final Exchange exchange;
- private final AMQShortString routingKey;
+ private final Exchange _exchange;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
ExchangeBinding(AMQShortString routingKey, Exchange exchange)
{
- this.routingKey = routingKey;
- this.exchange = exchange;
+ this(routingKey, exchange,EMPTY_ARGUMENTS);
+ }
+
+ ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
+ {
+ _routingKey = routingKey;
+ _exchange = exchange;
+ _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
}
void unbind(AMQQueue queue) throws AMQException
{
- exchange.deregisterQueue(routingKey, queue);
+ _exchange.deregisterQueue(_routingKey, queue, _arguments);
}
public Exchange getExchange()
{
- return exchange;
+ return _exchange;
}
public AMQShortString getRoutingKey()
{
- return routingKey;
+ return _routingKey;
}
public int hashCode()
{
- return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode());
+ return (_exchange == null ? 0 : _exchange.hashCode())
+ + (_routingKey == null ? 0 : _routingKey.hashCode())
+ + (_arguments == null ? 0 : _arguments.hashCode());
}
public boolean equals(Object o)
{
if (!(o instanceof ExchangeBinding)) return false;
ExchangeBinding eb = (ExchangeBinding) o;
- return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+ return _exchange.equals(eb._exchange)
+ && _routingKey.equals(eb._routingKey)
+ && _arguments.equals(eb._arguments);
}
}
@@ -88,11 +102,18 @@
* are being tracked by the instance has been bound to the exchange
* @param exchange the exchange bound to
*/
- void addBinding(AMQShortString routingKey, Exchange exchange)
+ void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+ {
+ _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+ }
+
+
+ public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
{
- _bindings.add(new ExchangeBinding(routingKey, exchange));
+ _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
}
+
/**
* Deregisters this queue from any exchange it has been bound to
*/
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -25,9 +25,10 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -37,9 +38,9 @@
private ContentHeaderBody _contentHeaderBody;
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
- private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+ private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
private boolean _redelivered;
@@ -64,7 +65,7 @@
return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -74,15 +75,15 @@
return _contentBodies.get(index);
}
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody)
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
throws AMQException
{
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
{
- return _publishBody;
+ return _messagePublishInfo;
}
public boolean isRedelivered()
@@ -106,15 +107,15 @@
/**
* This is called when all the content has been received.
- * @param publishBody
+ * @param messagePublishInfo
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
_arrivalTime = System.currentTimeMillis();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Tue Feb 20 08:20:41 2007
@@ -16,8 +16,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
@@ -25,7 +25,7 @@
*/
public class MessageMetaData
{
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
@@ -33,15 +33,15 @@
private long _arrivalTime;
- public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
{
this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
}
- public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
{
_contentHeaderBody = contentHeaderBody;
- _publishBody = publishBody;
+ _messagePublishInfo = publishBody;
_contentChunkCount = contentChunkCount;
_arrivalTime = arrivalTime;
}
@@ -66,14 +66,14 @@
_contentHeaderBody = contentHeaderBody;
}
- public BasicPublishBody getPublishBody()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _publishBody;
+ return _messagePublishInfo;
}
- public void setPublishBody(BasicPublishBody publishBody)
+ public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
}
public long getArrivalTime()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Tue Feb 20 08:20:41 2007
@@ -21,8 +21,8 @@
import java.util.List;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
/**
@@ -40,7 +40,7 @@
* Stored temporarily until the header has been received at which point it is used when
* constructing the handle
*/
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
/**
* Also stored temporarily.
@@ -59,14 +59,14 @@
*/
private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
- public BasicPublishBody getPublishBody()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _publishBody;
+ return _messagePublishInfo;
}
- public void setPublishBody(BasicPublishBody publishBody)
+ public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
}
public List<AMQQueue> getDestinationQueues()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -27,9 +27,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -40,9 +40,9 @@
{
private WeakReference<ContentHeaderBody> _contentHeaderBody;
- private WeakReference<BasicPublishBody> _publishBody;
+ private WeakReference<MessagePublishInfo> _messagePublishInfo;
- private List<WeakReference<ContentBody>> _contentBodies;
+ private List<WeakReference<ContentChunk>> _contentBodies;
private boolean _redelivered;
@@ -79,7 +79,7 @@
{
_arrivalTime = mmd.getArrivalTime();
_contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
- _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
}
public int getBodyCount(StoreContext context, Long messageId) throws AMQException
@@ -88,10 +88,10 @@
{
MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
int chunkCount = mmd.getContentChunkCount();
- _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+ _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
{
- _contentBodies.add(new WeakReference<ContentBody>(null));
+ _contentBodies.add(new WeakReference<ContentChunk>(null));
}
}
return _contentBodies.size();
@@ -102,19 +102,19 @@
return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
(_contentBodies.size() - 1));
}
- WeakReference<ContentBody> wr = _contentBodies.get(index);
- ContentBody cb = wr.get();
+ WeakReference<ContentChunk> wr = _contentBodies.get(index);
+ ContentChunk cb = wr.get();
if (cb == null)
{
cb = _messageStore.getContentBodyChunk(context, messageId, index);
- _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+ _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
}
return cb;
}
@@ -124,35 +124,36 @@
*
* @param storeContext
* @param messageId
- * @param contentBody
+ * @param contentChunk
* @param isLastContentBody
* @throws AMQException
*/
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
{
if (_contentBodies == null && isLastContentBody)
{
- _contentBodies = new ArrayList<WeakReference<ContentBody>>(1);
+ _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
}
else
{
if (_contentBodies == null)
{
- _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
}
}
- _contentBodies.add(new WeakReference<ContentBody>(contentBody));
- _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
+ _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1,
+ contentChunk, isLastContentBody);
}
- public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
{
- BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
+ MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
if (bpb == null)
{
MessageMetaData mmd = loadMessageMetaData(context, messageId);
- bpb = mmd.getPublishBody();
+ bpb = mmd.getMessagePublishInfo();
}
return bpb;
}
@@ -182,7 +183,7 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -190,7 +191,7 @@
// create en empty list here
if (contentHeaderBody.bodySize == 0)
{
- _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
}
final long arrivalTime = System.currentTimeMillis();
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentChunkAdapter
+{
+ public static ContentBody toConentBody(ContentChunk contentBodyChunk)
+ {
+ return new ContentBody(contentBodyChunk.getData());
+ }
+
+ public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
+ {
+ return new ContentChunk() {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Tue Feb 20 08:20:41 2007
@@ -31,10 +31,12 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
/**
* A simple message store that stores the messages in a threadsafe structure in memory.
@@ -49,7 +51,7 @@
protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
- protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
+ protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
@@ -57,7 +59,7 @@
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
@@ -65,7 +67,7 @@
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
_log.info("Using capacity " + hashtableCapacity + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
}
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
@@ -97,6 +99,26 @@
_contentBodyMap.remove(messageId);
}
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+
+ }
+
public void createQueue(AMQQueue queue) throws AMQException
{
// Not required to do anything
@@ -147,10 +169,10 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody)
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
throws AMQException
{
- List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
if(bodyList == null && lastContentBody)
{
@@ -160,7 +182,7 @@
{
if (bodyList == null)
{
- bodyList = new ArrayList<ContentBody>();
+ bodyList = new ArrayList<ContentChunk>();
_contentBodyMap.put(messageId, bodyList);
}
@@ -179,9 +201,9 @@
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
}
}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+public class MessagePublishInfoAdapter
+{
+ private final byte _majorVersion;
+ private final byte _minorVersion;
+ private final int _classId;
+ private final int _methodId;
+
+
+ public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
+ {
+ _majorVersion = majorVersion;
+ _minorVersion = minorVersion;
+ _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
+ _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
+ }
+
+ public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
+ {
+ return new BasicPublishBody(_majorVersion,
+ _minorVersion,
+ _classId,
+ _methodId,
+ pubInfo.getExchange(),
+ pubInfo.isImmediate(),
+ pubInfo.isMandatory(),
+ pubInfo.getRoutingKey(),
+ 0) ; // ticket
+ }
+
+ public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
+ {
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Tue Feb 20 08:20:41 2007
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store;
-import java.util.List;
-
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
public interface MessageStore
{
@@ -51,6 +51,15 @@
void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
+ void createExchange(Exchange exchange) throws AMQException;
+
+ void removeExchange(Exchange exchange) throws AMQException;
+
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(AMQShortString name) throws AMQException;
@@ -68,24 +77,17 @@
boolean inTran(StoreContext context);
/**
- * Recreate all queues that were persisted, including re-enqueuing of existing messages
- * @return
- * @throws AMQException
- */
- List<AMQQueue> createQueues() throws AMQException;
-
- /**
* Return a valid, currently unused message id.
* @return a message id
*/
Long getNewMessageId();
- void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException;
+ void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException;
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+ ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Feb 20 08:20:41 2007
@@ -99,9 +99,11 @@
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
_messageStore = store;
+
+ _exchangeRegistry.initialise();
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -117,9 +119,11 @@
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
initialiseMessageStore(hostConfig);
+
+ _exchangeRegistry.initialise();
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();