You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/21 11:09:09 UTC
svn commit: r629731 [1/3] - in /incubator/qpid/branches/M2.1/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/q...
Author: rgodfrey
Date: Thu Feb 21 02:09:03 2008
New Revision: 629731
URL: http://svn.apache.org/viewvc?rev=629731&view=rev
Log:
QPID-790 : Performance Improvements
Added:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/support/
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 21 02:09:03 2008
@@ -75,7 +75,7 @@
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private AtomicLong _deliveryTag = new AtomicLong(0);
+ private long _deliveryTag = 0;
/** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
@@ -99,8 +99,6 @@
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private final MessageRouter _exchanges;
-
private TransactionalContext _txnContext, _nonTransactedContext;
/**
@@ -124,7 +122,7 @@
public boolean ENABLE_JMSXUserID;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
//Set values from configuration
@@ -136,7 +134,7 @@
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
- _exchanges = exchanges;
+
// by default the session is non-transactional
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
@@ -297,7 +295,7 @@
public long getNextDeliveryTag()
{
- return _deliveryTag.incrementAndGet();
+ return ++_deliveryTag;
}
public int getNextConsumerTag()
@@ -969,16 +967,19 @@
public void processReturns(AMQProtocolSession session) throws AMQException
{
- for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ if(!_returnMessages.isEmpty())
{
- AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ {
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
- message.decrementReference(_storeContext);
- }
+ message.decrementReference(_storeContext);
+ }
- _returnMessages.clear();
+ _returnMessages.clear();
+ }
}
public boolean wouldSuspend(AMQMessage msg)
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Feb 21 02:09:03 2008
@@ -35,6 +35,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
@@ -275,7 +276,7 @@
// once more testing of the performance of the simple allocator has been done
if (!connectorConfig.enablePooledAllocator)
{
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
int port = connectorConfig.port;
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Thu Feb 21 02:09:03 2008
@@ -212,10 +212,9 @@
_logger.debug("Publishing message to queue " + queues);
}
- for (AMQQueue q : queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
+
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Thu Feb 21 02:09:03 2008
@@ -26,6 +26,7 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortStringTokenizer;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -40,11 +41,7 @@
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -84,12 +81,21 @@
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- private static final String TOPIC_SEPARATOR = ".";
- private static final String AMQP_STAR = "*";
- private static final String AMQP_HASH = "#";
+ private static final byte TOPIC_SEPARATOR = (byte)'.';
+ private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
+ private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
+ private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
+ private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
+ new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+ private static final byte HASH_BYTE = (byte)'#';
+ private static final byte STAR_BYTE = (byte)'*';
/** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
@@ -107,7 +113,7 @@
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
- for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+ for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
{
AMQShortString key = entry.getKey();
List<String> queueList = new ArrayList<String>();
@@ -156,27 +162,75 @@
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
- _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
- List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+ List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+
+
+
+
+
+
+
// if we got null back, no previous value was associated with the specified routing key hence
// we need to read back the new value just put into the map
if (queueList == null)
{
- queueList = _routingKey2queues.get(routingKey);
+ queueList = _bindingKey2queues.get(rKey);
}
+
+
if (!queueList.contains(queue))
{
queueList.add(queue);
+
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString routingKey = normalize(rKey);
+ List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+
+ if(queueList2 == null)
+ {
+ queueList2 = _wildCardBindingKey2queues.get(routingKey);
+ AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
+
+ while (keyTok.hasMoreTokens())
+ {
+ keyTokList.add(keyTok.nextToken());
+ }
+
+ _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+ }
+ queueList2.add(queue);
+
+ }
+ else
+ {
+ List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+ if(queueList2 == null)
+ {
+ queueList2 = _simpleBindingKey2queues.get(rKey);
+ }
+ queueList2.add(queue);
+
+ }
+
+
+
+
}
else if (_logger.isDebugEnabled())
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+ _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
}
+
+
}
private AMQShortString normalize(AMQShortString routingKey)
@@ -186,53 +240,58 @@
routingKey = AMQShortString.EMPTY_STRING;
}
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- List<String> _subscription = new ArrayList<String>();
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
while (routingTokens.hasMoreTokens())
{
- _subscription.add(routingTokens.nextToken());
+ subscriptionList.add(routingTokens.nextToken());
}
- int size = _subscription.size();
+ int size = subscriptionList.size();
for (int index = 0; index < size; index++)
{
// if there are more levels
if ((index + 1) < size)
{
- if (_subscription.get(index).equals(AMQP_HASH))
+ if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
{
- if (_subscription.get(index + 1).equals(AMQP_HASH))
+ if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
{
// we don't need #.# delete this one
- _subscription.remove(index);
+ subscriptionList.remove(index);
size--;
// redo this normalisation
index--;
}
- if (_subscription.get(index + 1).equals(AMQP_STAR))
+ if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
{
// we don't want #.* swap to *.#
// remove it and put it in at index + 1
- _subscription.add(index + 1, _subscription.remove(index));
+ subscriptionList.add(index + 1, subscriptionList.remove(index));
}
}
} // if we have more levels
}
- StringBuilder sb = new StringBuilder();
- for (String s : _subscription)
+
+ AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
+/*
+ StringBuilder sb = new StringBuilder();
+ for (AMQShortString s : subscriptionList)
{
sb.append(s);
sb.append(TOPIC_SEPARATOR);
}
sb.deleteCharAt(sb.length() - 1);
+*/
- return new AMQShortString(sb.toString());
+ return normalizedString;
}
public void route(AMQMessage payload) throws AMQException
@@ -254,19 +313,14 @@
else
{
_logger.warn("No queues found for routing key " + routingKey);
- _logger.warn("Routing map contains: " + _routingKey2queues);
+ _logger.warn("Routing map contains: " + _bindingKey2queues);
return;
}
}
- for (AMQQueue q : queues)
- {
- // TODO: modify code generator to add clone() method then clone the deliver body
- // without this addition we have a race condition - we will be modifying the body
- // before the encoder has encoded the body for delivery
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -276,21 +330,21 @@
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && queues.contains(queue);
}
public boolean isBound(AMQShortString routingKey)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (List<AMQQueue> queues : _routingKey2queues.values())
+ for (List<AMQQueue> queues : _bindingKey2queues.values())
{
if (queues.contains(queue))
{
@@ -303,7 +357,7 @@
public boolean hasBindings()
{
- return !_routingKey2queues.isEmpty();
+ return !_bindingKey2queues.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -311,13 +365,11 @@
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
-
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _bindingKey2queues.get(rKey);
if (queues == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey + ". No queue was registered with that _routing key");
+ + " with routing key " + rKey + ". No queue was registered with that _routing key");
}
@@ -325,12 +377,39 @@
if (!removedQ)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey);
+ + " with routing key " + rKey);
+ }
+
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString bindingKey = normalize(rKey);
+ List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _wildCardBindingKey2queues.remove(bindingKey);
+ _bindingKey2Tokenized.remove(bindingKey);
+ }
+
}
+ else
+ {
+ List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _simpleBindingKey2queues.remove(rKey);
+ }
+
+ }
+
+
+
if (queues.isEmpty())
{
- _routingKey2queues.remove(routingKey);
+ _bindingKey2queues.remove(rKey);
}
}
@@ -349,117 +428,167 @@
public Map<AMQShortString, List<AMQQueue>> getBindings()
{
- return _routingKey2queues;
+ return _bindingKey2queues;
}
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
- List<AMQQueue> list = new LinkedList<AMQQueue>();
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- ArrayList<String> routingkeyList = new ArrayList<String>();
+ List<AMQQueue> list = null;
- while (routingTokens.hasMoreTokens())
+ if(!_wildCardBindingKey2queues.isEmpty())
{
- String next = routingTokens.nextToken();
- if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
- {
- continue;
- }
- routingkeyList.add(next);
- }
- for (AMQShortString queue : _routingKey2queues.keySet())
- {
- StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ final int routingTokensCount = routingTokens.countTokens();
- ArrayList<String> queueList = new ArrayList<String>();
- while (queTok.hasMoreTokens())
+ AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
+
+ if(routingTokensCount == 1)
{
- queueList.add(queTok.nextToken());
+ routingkeyTokens[0] =routingKey;
}
+ else
+ {
- int depth = 0;
- boolean matching = true;
- boolean done = false;
- int routingskip = 0;
- int queueskip = 0;
- while (matching && !done)
- {
- if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
+ int token = 0;
+ while (routingTokens.hasMoreTokens())
{
- done = true;
- // if it was the routing key that ran out of digits
- if (routingkeyList.size() == (depth + routingskip))
- {
- if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching =
- queueList.get(depth + queueskip).equals(AMQP_HASH)
- && (queueList.size() == (depth + queueskip + 1));
- }
- }
- else if (routingkeyList.size() > (depth + routingskip))
+ AMQShortString next = routingTokens.nextToken();
+ /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
{
- // There is still more routing key to check
- matching = false;
+ continue;
}
+ */
- continue;
+ routingkeyTokens[token++] = next;
}
+ }
+ for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
+ {
+
+ AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
+
+
+ boolean matching = true;
+ boolean done = false;
- // if the values on the two topics don't match
- if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+ int depthPlusRoutingSkip = 0;
+ int depthPlusQueueSkip = 0;
+
+ final int bindingKeyTokensCount = bindingKeyTokens.length;
+
+ while (matching && !done)
{
- if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+
+ if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
{
- depth++;
+ done = true;
+
+ // if it was the routing key that ran out of digits
+ if (routingTokensCount == depthPlusRoutingSkip)
+ {
+ if (bindingKeyTokensCount > depthPlusQueueSkip)
+ { // a hash and it is the last entry
+ matching =
+ bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
+ && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
+ }
+ }
+ else if (routingTokensCount > depthPlusRoutingSkip)
+ {
+ // There is still more routing key to check
+ matching = false;
+ }
continue;
}
- else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+
+ // if the values on the two topics don't match
+ if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
{
- // Is this a # at the end
- if (queueList.size() == (depth + queueskip + 1))
+ if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
{
- done = true;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
continue;
}
-
- // otherwise # in the middle
- while (routingkeyList.size() > (depth + routingskip))
+ else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
{
- if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+ // Is this a # at the end
+ if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
+ {
+ done = true;
+
+ continue;
+ }
+
+ // otherwise # in the middle
+ while (routingTokensCount > depthPlusRoutingSkip)
{
- queueskip++;
- depth++;
+ if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
+ {
+ depthPlusQueueSkip += 2;
+ depthPlusRoutingSkip++;
+
+ break;
+ }
- break;
+ depthPlusRoutingSkip++;
}
- routingskip++;
+ continue;
}
- continue;
+ matching = false;
}
- matching = false;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
}
- depth++;
+ if (matching)
+ {
+ if(list == null)
+ {
+ list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ else
+ {
+ list.addAll(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ }
}
- if (matching)
+ }
+ if(!_simpleBindingKey2queues.isEmpty())
+ {
+ List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
+ if(list == null)
+ {
+ if(queues == null)
+ {
+ list = Collections.EMPTY_LIST;
+ }
+ else
+ {
+ list = new ArrayList<AMQQueue>(queues);
+ }
+ }
+ else if(queues != null)
{
- list.addAll(_routingKey2queues.get(queue));
+ list.addAll(queues);
}
+
}
return list;
+
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Feb 21 02:09:03 2008
@@ -42,6 +42,7 @@
import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@
_logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Thu Feb 21 02:09:03 2008
@@ -29,7 +29,6 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,8 +54,8 @@
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
- virtualHost.getExchangeRegistry());
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+ );
session.addChannel(channel);
ChannelOpenOkBody response;
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Thu Feb 21 02:09:03 2008
@@ -100,8 +100,8 @@
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -151,8 +151,8 @@
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -247,14 +247,13 @@
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Thu Feb 21 02:09:03 2008
@@ -12,11 +12,14 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -47,10 +50,9 @@
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -60,8 +62,8 @@
if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -75,9 +77,9 @@
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -86,7 +88,7 @@
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -95,6 +97,14 @@
}
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -106,8 +116,7 @@
AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -126,9 +135,9 @@
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -137,7 +146,7 @@
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -147,7 +156,7 @@
}
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -158,23 +167,53 @@
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQDataBlock returnBlock = new DeferredDataBlock()
+ final AMQBody returnBlock = new AMQBody()
{
- protected AMQDataBlock createAMQDataBlock()
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
{
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
isRedelivered,
exchangeName,
routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
}
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
};
return returnBlock;
}
@@ -225,8 +264,7 @@
{
AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -236,14 +274,13 @@
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -272,4 +309,64 @@
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -208,27 +208,10 @@
{
_logger.debug("Frame Received: " + frame);
}
+
+
+ body.handle(channelId, this);
- if (body instanceof AMQMethodBody)
- {
- methodFrameReceived(channelId, (AMQMethodBody) body);
- }
- else if (body instanceof ContentHeaderBody)
- {
- contentHeaderReceived(channelId, (ContentHeaderBody) body);
- }
- else if (body instanceof ContentBody)
- {
- contentBodyReceived(channelId, (ContentBody) body);
- }
- else if (body instanceof HeartbeatBody)
- {
- // NO OP
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,7 +254,7 @@
}
}
- private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -365,7 +348,7 @@
}
catch (Exception e)
{
- _stateManager.error(e);
+
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
@@ -375,7 +358,7 @@
}
}
- private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,11 +367,16 @@
}
- private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body, this);
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ // NO - OP
}
/**
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Feb 21 02:09:03 2008
@@ -30,6 +30,7 @@
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
@@ -82,7 +83,7 @@
createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+ final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 21 02:09:03 2008
@@ -83,7 +83,7 @@
private long _expiration;
- private final int hashcode = System.identityHashCode(this);
+
private Exchange _exchange;
private static final boolean SYNCED_CLOCKS =
@@ -92,7 +92,7 @@
public String debugIdentity()
{
- return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
public void setExpiration()
@@ -141,6 +141,11 @@
_exchange.route(this);
}
+ public void enqueue(final List<AMQQueue> queues)
+ {
+ _transientMessageData.setDestinationQueues(queues);
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -664,7 +669,7 @@
}
finally
{
- destinationQueues.clear();
+
// Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 21 02:09:03 2008
@@ -37,9 +37,7 @@
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -158,6 +156,8 @@
public AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -200,6 +200,13 @@
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
}
private AMQQueueMBean createMBean() throws AMQException
@@ -214,7 +221,7 @@
}
}
- public AMQShortString getName()
+ public final AMQShortString getName()
{
return _name;
}
@@ -540,9 +547,17 @@
return _maximumMessageSize;
}
- public void setMaximumMessageSize(long value)
+ public void setMaximumMessageSize(final long maximumMessageSize)
{
- _maximumMessageSize = value;
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
}
public int getConsumerCount()
@@ -565,9 +580,20 @@
return _maximumMessageCount;
}
- public void setMaximumMessageCount(long value)
+ public void setMaximumMessageCount(final long maximumMessageCount)
{
- _maximumMessageCount = value;
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
}
public long getMaximumQueueDepth()
@@ -576,9 +602,18 @@
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(long value)
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
{
- _maximumQueueDepth = value;
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
}
public long getOldestMessageArrivalTime()
@@ -661,6 +696,10 @@
}
_subscribers.addSubscriber(subscription);
+ if(exclusive)
+ {
+ _subscribers.setExclusive(true);
+ }
}
private boolean isExclusive()
@@ -692,6 +731,7 @@
ps, channel, consumerTag, this));
}
+ _subscribers.setExclusive(false);
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
consumerTag)))
@@ -805,7 +845,7 @@
public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+ _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -938,6 +978,14 @@
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
}
public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -965,5 +1013,10 @@
_deliveryMgr.removeExpired();
}
}
+ }
+
+ public final Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Feb 21 02:09:03 2008
@@ -54,10 +54,7 @@
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -97,6 +94,9 @@
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
+
+
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -249,16 +249,21 @@
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
- for (NotificationCheck check : NotificationCheck.values())
+ if(!notificationChecks.isEmpty())
{
- if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- _lastNotificationTimes[check.ordinal()] = currentTime;
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Feb 21 02:09:03 2008
@@ -363,8 +363,9 @@
protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-entry.getSize());
+
}
+ _totalMessageSize.addAndGet(-entry.getSize());
if (!acks)
{
@@ -918,7 +919,7 @@
{
if (!s.isSuspended())
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Feb 21 02:09:03 2008
@@ -29,9 +29,9 @@
{
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 21 02:09:03 2008
@@ -292,14 +292,17 @@
queue.dequeue(storeContext, entry);
}
+/*
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+ }
+*/
+
synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
- {
- _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
- }
if (_acks)
{
@@ -308,10 +311,11 @@
protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
- if (!_acks)
- {
- entry.getMessage().decrementReference(storeContext);
- }
+
+ }
+ if (!_acks)
+ {
+ entry.getMessage().decrementReference(storeContext);
}
}
finally
@@ -367,59 +371,60 @@
// return false;
}
- final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
//todo - client id should be recoreded and this test removed but handled below
- if (_noLocal && publisher != null)
+ if (_noLocal)
{
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
- Object msgInstance;
- if ((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+ if(publisher != null)
+
{
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance;
+ Object msgInstance;
- if ((publisher.getClientProperties() != null) &&
- (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
}
- }
- else
- {
+ else
+ {
- localInstance = protocolSession.getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
+ localInstance = protocolSession.getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
- msgInstance = publisher.getClientIdentifier();
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ msgInstance = publisher.getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
- }
-
+ }
}
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
- }
return checkFilters(entry);
}
@@ -433,23 +438,7 @@
private boolean checkFilters(QueueEntry msg)
{
- if (_filters != null)
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has filters.");
-// }
- return _filters.allAllow(msg.getMessage());
- }
- else
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no filters");
-// }
-
- return true;
- }
+ return (_filters == null) || _filters.allAllow(msg.getMessage());
}
public Queue<QueueEntry> getPreDeliveryQueue()
@@ -613,7 +602,7 @@
public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg.getMessage());
+ return _acks && channel.wouldSuspend(msg.getMessage());
}
public Queue<QueueEntry> getResendQueue()
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Feb 21 02:09:03 2008
@@ -39,6 +39,7 @@
private int _currentSubscriber;
private final Object _changeLock = new Object();
+ private volatile boolean _exclusive;
/** Accessor for unit tests. */
@@ -116,10 +117,7 @@
*/
public Subscription nextSubscriber(QueueEntry msg)
{
- if (_subscriptions.isEmpty())
- {
- return null;
- }
+
try
{
@@ -143,30 +141,64 @@
private Subscription nextSubscriberImpl(QueueEntry msg)
{
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext())
+ if(_exclusive)
{
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
-
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ try
{
- if (subscription.hasInterest(msg))
+ Subscription subscription = _subscriptions.get(0);
+ subscriberScanned();
+
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (subscription.hasInterest(msg))
{
- return subscription;
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
}
}
}
+ catch(IndexOutOfBoundsException e)
+ {
+ }
+ return null;
}
+ else
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext())
+ {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
- return null;
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
}
/** Overridden in test classes. */
@@ -233,5 +265,14 @@
{
return _changeLock;
}
-
+
+ public void setExclusive(final boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public boolean getExcBoolean()
+ {
+ return _exclusive;
+ }
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Thu Feb 21 02:09:03 2008
@@ -22,6 +22,8 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private List<AMQQueue> _destinationQueues;
public MessagePublishInfo getMessagePublishInfo()
{
@@ -74,7 +76,7 @@
public List<AMQQueue> getDestinationQueues()
{
- return _destinationQueues;
+ return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
}
public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@
public void addDestinationQueue(AMQQueue queue)
{
+ if(_destinationQueues == null)
+ {
+ _destinationQueues = new ArrayList<AMQQueue>();
+ }
_destinationQueues.add(queue);
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Feb 21 02:09:03 2008
@@ -49,11 +49,11 @@
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private Set<Long> _browsedAcks;
+ private final Set<Long> _browsedAcks;
private final MessageStore _messageStore;
- private StoreContext _storeContext;
+ private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 21 02:09:03 2008
@@ -23,6 +23,7 @@
import javax.management.NotCompliantMBeanException;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.security.access.AccessManager;
@@ -123,7 +124,7 @@
*/
public VirtualHost(String name, MessageStore store) throws Exception
{
- this(name, null, store);
+ this(name, new PropertiesConfiguration(), store);
}
/**
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Feb 21 02:09:03 2008
@@ -173,7 +173,7 @@
public void testQueueDepthAlertWithSubscribers() throws Exception
{
protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
// Create queue
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 21 02:09:03 2008
@@ -123,7 +123,7 @@
TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
_queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);