You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/23 11:20:45 UTC
svn commit: r510897 [1/2] - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/serv...
Author: ritchiem
Date: Fri Feb 23 02:20:44 2007
New Revision: 510897
URL: http://svn.apache.org/viewvc?view=rev&rev=510897
Log:
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (with props)
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 23 02:20:44 2007
@@ -46,6 +46,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -74,28 +75,20 @@
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /**
- * A channel has a default queue (the last declared) that is used when no queue name is
- * explictily set
- */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
- /**
- * This tag is unique per subscription to a queue. The server returns this in response to a
- * basic.consume request.
- */
+ /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
private int _consumerTag;
/**
- * The current message - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the message gets updated and once all
- * frames have been received the message can then be routed.
+ * The current message - which may be partial in the sense that not all frames have been received yet - which has
+ * been received by this channel. As the frames are received the message gets updated and once all frames have been
+ * received the message can then be routed.
*/
private AMQMessage _currentMessage;
- /**
- * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
- */
+ /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -109,8 +102,8 @@
private TransactionalContext _txnContext;
/**
- * A context used by the message store enabling it to track context for a given channel even across
- * thread boundaries
+ * A context used by the message store enabling it to track context for a given channel even across thread
+ * boundaries
*/
private final StoreContext _storeContext;
@@ -123,7 +116,6 @@
private final AMQProtocolSession _session;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -138,9 +130,7 @@
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- /**
- * Sets this channel to be part of a local transaction
- */
+ /** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
_txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
@@ -293,17 +283,17 @@
}
/**
- * Subscribe to a queue. We register all subscriptions in the channel so that
- * if the channel is closed we can clean up all subscriptions, even if the
- * client does not explicitly unsubscribe from all queues.
- *
- * @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
- * @param session the protocol session of the subscriber
+ * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
+ * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
+ *
+ * @param tag the tag chosen by the client (if null, server will generate one)
+ * @param queue the queue to subscribe to
+ * @param session the protocol session of the subscriber
* @param noLocal
* @param exclusive
- * @return the consumer tag. This is returned to the subscriber and used in
- * subsequent unsubscribe requests
+ *
+ * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
+ *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
@@ -335,7 +325,7 @@
}
/**
- * Called from the protocol session to close this channel and clean up.
+ * Called from the protocol session to close this channel and clean up. T
*
* @throws AMQException if there is an error during closure
*/
@@ -344,8 +334,6 @@
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
-
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -362,8 +350,8 @@
* Add a message to the channel-based list of unacknowledged messages
*
* @param message the message that was delivered
- * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
- * the delivery tag)
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
+ * delivery tag)
* @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
@@ -376,8 +364,8 @@
}
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
- * May result in delivery to this same channel or to other subscribers.
+ * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * this same channel or to other subscribers.
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
@@ -386,23 +374,75 @@
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- _txnContext.deliver(unacked.message, unacked.queue);
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
}
}
}
+ public void requeue(long deliveryTag) throws AMQException
+ {
+ UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
- /**
- * Called to resend all outstanding unacknowledged messages to this same channel.
- */
+ if (unacked != null)
+ {
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
+ unacked.message.decrementReference(_storeContext);
+ }
+ else
+ {
+ _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+ }
+
+
+ }
+
+
+ /** Called to resend all outstanding unacknowledged messages to this same channel. */
public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+ final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+ }
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
@@ -412,21 +452,40 @@
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
+ if (consumerTag != null)
{
- msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msgToResend.add(message);
+ }
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
}
else
{
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (message.queue != null && (consumerTag == null || requeue))
+ if (message.queue != null)
+ {
+ if (requeue)
+ {
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ }
+ }
+ else
{
- msgToRequeue.add(message);
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
-
+
// false means continue processing
return false;
}
@@ -436,21 +495,112 @@
}
});
- for(UnacknowledgedMessage message : msgToRequeue)
+ // Process Messages to Resend
+ if (_log.isInfoEnabled())
{
- _txnContext.deliver(message.message, message.queue);
+ if (!msgToResend.isEmpty())
+ {
+ _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+ }
+ }
+ for (UnacknowledgedMessage message : msgToResend)
+ {
+ AMQMessage msg = message.message;
+
+ // Our Java Client will always suspend the channel when resending!!
+// if (isSuspended())
+// {
+// _log.info("Channel is suspended so requeuing");
+// //move this message to requeue
+// msgToRequeue.add(message);
+// }
+// else
+ {
+ //release to allow it to be delivered
+ msg.release();
+
+ // Without any details from the client about what has been processed we have to mark
+ // all messages in the unacked map as redelivered.
+ msg.setRedelivered(true);
+
+
+ Subscription sub = msg.getDeliveredSubscription();
+
+ if (sub != null)
+ {
+ synchronized (sub.getSendLock())
+ {
+ if (sub.isClosed())
+ {
+ _log.info("Subscription closed during resend so requeuing message");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
+ }
+ // Will throw an exception if the sub is closed
+ sub.addToResendQueue(msg);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ // Don't decrement as we are bypassing the normal deliver which increments
+ // this is what there is a decrement on the Requeue as deliver will increment.
+ // msg.decrementReference(_storeContext);
+ }
+ }
+ }
+ else
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ }
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ if (!msgToRequeue.isEmpty())
+ {
+ _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+ }
+ }
+
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ // Process Messages to Requeue at the front of the queue
+ for (UnacknowledgedMessage message : msgToRequeue)
+ {
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(message.message, message.queue, true);
+ }
+ else
+ {
+ _txnContext.deliver(message.message, message.queue, true);
+ }
+
_unacknowledgedMessageMap.remove(message.deliveryTag);
message.message.decrementReference(_storeContext);
}
}
/**
- * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
- * messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item since we may get an ack for a delivery tag that was generated from the
- * deleted queue.
+ * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
+ * remove the queue reference and also decrement any message reference counts, without actually removing the item
+ * since we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue the queue that has been deleted
+ *
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
public void queueDeleted(final AMQQueue queue) throws AMQException
@@ -487,6 +637,7 @@
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
+ *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
@@ -517,10 +668,10 @@
private void checkSuspension()
{
boolean suspend;
-
- suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
-
+
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -570,8 +721,6 @@
public void rollback() throws AMQException
{
_txnContext.rollback();
-
-
}
public String toString()
@@ -617,8 +766,8 @@
}
else
{
- boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
- if(!willSuspend)
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
@@ -626,12 +775,17 @@
}
- if(willSuspend)
+ if (willSuspend)
{
setSuspended(true);
}
return willSuspend;
}
+ }
+
+ public TransactionalContext getTransactionalContext()
+ {
+ return _txnContext;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Feb 23 02:20:44 2007
@@ -85,7 +85,6 @@
for (UnacknowledgedMessage msg : msgs)
{
remove(msg.deliveryTag);
-
}
}
}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=auto&rev=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Fri Feb 23 02:20:44 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
+
+ private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
+
+ public static BasicRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicRejectMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRejectBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
+
+ int channelId = evt.getChannelId();
+ UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
+
+ _logger.info("Need to reject message:" + message);
+// if (evt.getMethod().requeue)
+// {
+// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
+// }
+// else
+// {
+// // session.getChannel(channelId).resend(message);
+// }
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Feb 23 02:20:44 2007
@@ -62,6 +62,7 @@
session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
channel.resend(session, false);
}
catch (AMQException e)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 23 02:20:44 2007
@@ -36,21 +36,15 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /**
- * Used in clustering
- */
+ /** Used in clustering */
private Set<Object> _tokens;
- /**
- * Only use in clustering - should ideally be removed?
- */
+ /** Only use in clustering - should ideally be removed? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -63,16 +57,14 @@
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a
- * consumer. Used in implementing return functionality for
+ * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
/**
- * We need to keep track of whether the message was 'immediate'
- * as in extreme circumstances, when the checkDelieveredToConsumer
- * is called, the message may already have been received and acknowledged,
- * and the body removed from the store.
+ * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
+ * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
+ * removed from the store.
*/
private boolean _immediate;
@@ -80,11 +72,16 @@
private TransientMessageData _transientMessageData = new TransientMessageData();
+ private Subscription _takenBySubcription;
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
/**
- * Used to iterate through all the body frames associated with this message. Will not
- * keep all the data in memory therefore is memory-efficient.
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
*/
private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
@@ -103,7 +100,7 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -153,7 +150,7 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -166,7 +163,7 @@
{
try
{
- return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
}
catch (AMQException e)
{
@@ -196,12 +193,14 @@
}
/**
- * Used when recovering, i.e. when the message store is creating references to messages.
- * In that case, the normal enqueue/routingComplete is not done since the recovery process
- * is responsible for routing the messages to queues.
+ * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+ * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+ * queues.
+ *
* @param messageId
* @param store
* @param factory
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
@@ -213,8 +212,8 @@
}
/**
- * Used in testing only. This allows the passing of the content header immediately
- * on construction.
+ * Used in testing only. This allows the passing of the content header immediately on construction.
+ *
* @param messageId
* @param info
* @param txnContext
@@ -228,14 +227,15 @@
}
/**
- * Used in testing only. This allows the passing of the content header and some body fragments on
- * construction.
+ * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+ *
* @param messageId
* @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
@@ -280,7 +280,7 @@
}
else
{
- return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
@@ -338,16 +338,14 @@
return _messageId;
}
- /**
- * Threadsafe. Increment the reference count on the message.
- */
+ /** Threadsafe. Increment the reference count on the message. */
public void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
}
@@ -355,7 +353,7 @@
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -371,7 +369,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
@@ -394,13 +392,13 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
}
}
- if(_referenceCount.get()<0)
+ if (_referenceCount.get() < 0)
{
throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
}
@@ -419,7 +417,8 @@
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
@@ -427,10 +426,17 @@
}
-
- public boolean taken()
+ public boolean taken(Subscription sub)
{
- return _taken.getAndSet(true);
+ if (_taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
+ }
}
public void release()
@@ -441,9 +447,9 @@
public boolean checkToken(Object token)
{
- if(_tokens==null)
+ if (_tokens == null)
{
- _tokens = new HashSet<Object>();
+ _tokens = new HashSet<Object>();
}
if (_tokens.contains(token))
@@ -458,11 +464,12 @@
}
/**
- * Registers a queue to which this message is to be delivered. This is
- * called from the exchange when it is routing the message. This will be called before any content bodies have
- * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
+ * the message. This will be called before any content bodies have been received so that the choice of
+ * AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
+ *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
public void enqueue(AMQQueue queue) throws AMQException
@@ -483,16 +490,15 @@
}
else
{
- return _messageHandle.isPersistent(getStoreContext(),_messageId);
+ return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
}
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
{
@@ -500,7 +506,7 @@
if (_immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public MessagePublishInfo getMessagePublishInfo() throws AMQException
@@ -512,7 +518,7 @@
}
else
{
- pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
return pb;
}
@@ -533,10 +539,7 @@
}
- /**
- * Called when this message is delivered to a consumer. (used to
- * implement the 'immediate' flag functionality).
- */
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
@@ -566,7 +569,7 @@
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q);
+ _txnContext.deliver(this, q, true);
}
}
finally
@@ -583,23 +586,22 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -609,9 +611,9 @@
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -627,22 +629,21 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -652,9 +653,9 @@
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -685,10 +686,10 @@
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,7 +700,7 @@
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ protocolSession.getProtocolMinorVersion(),
getMessagePublishInfo().getExchange(),
replyCode, replyText,
getMessagePublishInfo().getRoutingKey());
@@ -757,12 +758,11 @@
}
catch (AMQException e)
{
- _log.error(e.toString(),e);
+ _log.error(e.toString(), e);
return 0;
}
- }
-
+ }
public void restoreTransientMessageData() throws AMQException
@@ -771,7 +771,7 @@
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
- _transientMessageData = transientMessageData;
+ _transientMessageData = transientMessageData;
}
@@ -784,6 +784,11 @@
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken;
+ _taken + " by:" + _takenBySubcription;
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return _takenBySubcription;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 23 02:20:44 2007
@@ -45,13 +45,11 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
*/
public class AMQQueue implements Managable, Comparable
{
-
-
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -74,26 +72,19 @@
private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
-
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
- /**
- * null means shared
- */
+ /** null means shared */
private final AMQShortString _owner;
private final boolean _durable;
- /**
- * If true, this queue is deleted when the last subscriber is removed
- */
+ /** If true, this queue is deleted when the last subscriber is removed */
private final boolean _autoDelete;
- /**
- * Holds subscribers to the queue.
- */
+ /** Holds subscribers to the queue. */
private final SubscriptionSet _subscribers;
private final SubscriptionFactory _subscriptionFactory;
@@ -106,20 +97,13 @@
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- /**
- * Manages message delivery.
- */
+ /** Manages message delivery. */
private final DeliveryManager _deliveryMgr;
- /**
- * Used to track bindings to exchanges so that on deletion they can easily
- * be cancelled.
- */
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- /**
- * Executor on which asynchronous delivery will be carriedout where required
- */
+ /** Executor on which asynchronous delivery will be carriedout where required */
private final Executor _asyncDelivery;
private final AMQQueueMBean _managedObject;
@@ -127,39 +111,27 @@
private final VirtualHost _virtualHost;
- /**
- * max allowed size(KB) of a single message
- */
+ /** max allowed size(KB) of a single message */
@Configured(path = "maximumMessageSize", defaultValue = "0")
public long _maximumMessageSize;
- /**
- * max allowed number of messages on a queue.
- */
+ /** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
public int _maximumMessageCount;
- /**
- * max queue depth for the queue
- */
+ /** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
public long _maximumQueueDepth;
- /**
- * maximum message age before alerts occur
- */
+ /** maximum message age before alerts occur */
@Configured(path = "maximumMessageAge", defaultValue = "0")
public long _maximumMessageAge;
- /**
- * the minimum interval between sending out consequetive alerts of the same type
- */
+ /** the minimum interval between sending out consequetive alerts of the same type */
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
- /**
- * total messages received by the queue since startup.
- */
+ /** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
@@ -176,7 +148,6 @@
}
-
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
@@ -211,7 +182,7 @@
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -251,17 +222,13 @@
return _autoDelete;
}
- /**
- * @return no of messages(undelivered) on the queue.
- */
+ /** @return no of messages(undelivered) on the queue. */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
- /**
- * @return List of messages(undelivered) on the queue.
- */
+ /** @return List of messages(undelivered) on the queue. */
public List<AMQMessage> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
@@ -275,6 +242,7 @@
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -294,13 +262,12 @@
}
/**
- * moves messages from this queue to another queue. to do this the approach is following-
- * - setup the queue for moving messages (hold the lock and stop the async delivery)
- * - get all the messages available in the given message id range
- * - setup the other queue for moving messages (hold the lock and stop the async delivery)
- * - send these available messages to the other queue (enqueue in other queue)
- * - Once sending to other Queue is successful, remove messages from this queue
- * - remove locks from both queues and start async delivery
+ * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
+ * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
+ * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+ * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
+ * remove messages from this queue - remove locks from both queues and start async delivery
+ *
* @param fromMessageId
* @param toMessageId
* @param queueName
@@ -316,7 +283,7 @@
startMovingMessages();
List<AMQMessage> list = getMessagesOnTheQueue();
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+ int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
// Run this loop till you find all the messages or the list has no more messages
for (AMQMessage message : list)
@@ -344,7 +311,7 @@
{
// remove the lock and start the async delivery
anotherQueue.stopMovingMessages();
- stopMovingMessages();
+ stopMovingMessages();
}
}
@@ -364,10 +331,8 @@
_deliveryMgr.stopMovingMessages();
_deliveryMgr.processAsync(_asyncDelivery);
}
-
- /**
- * @return MBean object associated with this Queue
- */
+
+ /** @return MBean object associated with this Queue */
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -422,20 +387,16 @@
public long getOldestMessageArrivalTime()
{
return _deliveryMgr.getOldestMessageArrival();
-
+
}
- /**
- * Removes the AMQMessage from the top of the queue.
- */
+ /** Removes the AMQMessage from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
- /**
- * removes all the messages from the queue.
- */
+ /** removes all the messages from the queue. */
public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
@@ -443,10 +404,10 @@
public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
- exchange.registerQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ exchange.registerQueue(routingKey, this, arguments);
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
}
_bindings.addBinding(routingKey, arguments, exchange);
}
@@ -454,9 +415,9 @@
public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
}
_bindings.remove(routingKey, arguments, exchange);
}
@@ -466,30 +427,31 @@
FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
- if(incrementSubscriberCount() > 1)
+ if (incrementSubscriberCount() > 1)
{
- if(isExclusive())
+ if (isExclusive())
{
decrementSubscriberCount();
throw EXISTING_EXCLUSIVE;
}
- else if(exclusive)
+ else if (exclusive)
{
decrementSubscriberCount();
throw EXISTING_SUBSCRIPTION;
}
}
- else if(exclusive)
+ else if (exclusive)
{
setExclusive(true);
}
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
+ filters, noLocal, this);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -537,10 +499,10 @@
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ removedSubscription.close();
setExclusive(false);
decrementSubscriberCount();
-
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -583,13 +545,13 @@
public void delete() throws AMQException
{
- if(!_deleted.getAndSet(true))
+ if (!_deleted.getAndSet(true))
{
_subscribers.queueDeleted(this);
_bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
_managedObject.unregister();
- for(Task task : _deleteTaskList)
+ for (Task task : _deleteTaskList)
{
task.doTask(this);
}
@@ -605,7 +567,8 @@
public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ //fixme not sure what this is doing. should we be passing deliverFirst through here?
+ _deliveryMgr.deliver(storeContext, getName(), msg, false);
try
{
msg.checkDeliveredToConsumer();
@@ -620,9 +583,9 @@
}
- public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -731,7 +694,7 @@
public static interface Task
{
- public void doTask(AMQQueue queue) throws AMQException;
+ public void doTask(AMQQueue queue) throws AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -759,4 +722,8 @@
_maximumMessageAge = maximumMessageAge;
}
+ public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+ {
+ _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=510897&r1=510896&r2=510897
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb 23 02:20:44 2007
@@ -24,9 +24,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -38,12 +43,12 @@
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-/**
- * Manages delivery of messages on behalf of a queue
- */
+/** Manages delivery of messages on behalf of a queue */
public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -51,47 +56,36 @@
@Configured(path = "advanced.compressBufferOnQueue",
defaultValue = "false")
public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ /** Holds any queued messages */
+ private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
- private final ReentrantLock _messageAccessLock = new ReentrantLock();
-
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
+ /** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
+ /** The subscriptions on the queue to whom messages are delivered */
private final SubscriptionManager _subscriptions;
/**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
+ * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
+ * acknowledgements a handle on the queue.
*/
private final AMQQueue _queue;
/**
- * Flag used while moving messages from this queue to another. For moving messages the async delivery
- * should also stop. This flat should be set to true to stop async delivery and set to false to enable
- * async delivery again.
+ * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
+ * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
*/
private AtomicBoolean _movingMessages = new AtomicBoolean();
-
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
+ * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
+ * of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
-
+ private AtomicInteger _extraMessages = new AtomicInteger();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,7 +103,7 @@
}
- private boolean addMessageToQueue(AMQMessage msg)
+ private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
{
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
@@ -122,7 +116,14 @@
}
}
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
_totalMessageSize.addAndGet(msg.getSize());
@@ -135,7 +136,7 @@
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !(_messages.isEmpty() && _hasContent.isEmpty());
}
finally
{
@@ -149,18 +150,17 @@
}
/**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
+ * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
*
* @return int the number of messages in the delivery queue.
*/
private int getMessageCount()
{
- return _messages.size();
+ return _messages.size() + _extraMessages.get();
}
-
public long getTotalMessageSize()
{
return _totalMessageSize.get();
@@ -172,6 +172,38 @@
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ {
+ _lock.lock();
+ try
+ {
+ if (hasContent)
+ {
+ _log.debug("Queue has adding subscriber content");
+ _hasContent.add(subscription);
+ _totalMessageSize.addAndGet(msg.getSize());
+ _extraMessages.addAndGet(1);
+ }
+ else
+ {
+ _log.debug("Queue has removing subscriber content");
+ if (msg == null)
+ {
+ _hasContent.remove(subscription);
+ }
+ else
+ {
+ _totalMessageSize.addAndGet(-msg.getSize());
+ _extraMessages.addAndGet(-1);
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
public List<AMQMessage> getMessages()
{
@@ -195,7 +227,7 @@
AMQMessage message = currentQueue.next();
if (subscription.hasInterest(message))
{
- subscription.enqueueForPreDelivery(message);
+ subscription.enqueueForPreDelivery(message, false);
}
}
}
@@ -203,7 +235,7 @@
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
AMQMessage msg = getNextMessage();
- if(msg == null)
+ if (msg == null)
{
return false;
}
@@ -229,7 +261,7 @@
}
_queue.dequeue(channel.getStoreContext(), msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -252,8 +284,8 @@
}
/**
- * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
- * so that the asyn delivery is also stopped.
+ * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
+ * the asyn delivery is also stopped.
*/
public void startMovingMessages()
{
@@ -262,8 +294,8 @@
}
/**
- * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
- * so that the async delivery can start again.
+ * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
+ * the async delivery can start again.
*/
public void stopMovingMessages()
{
@@ -276,6 +308,7 @@
/**
* Messages will be removed from this queue and all preDeliveryQueues
+ *
* @param messageList
*/
public void removeMovedMessages(List<AMQMessage> messageList)
@@ -308,7 +341,9 @@
/**
* Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+ *
* @param storeContext
+ *
* @throws AMQException
*/
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
@@ -318,11 +353,11 @@
if (msg != null)
{
// mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
getNextMessage();
}
-
+
_lock.unlock();
}
@@ -335,7 +370,7 @@
while (msg != null)
{
//mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
msg = getNextMessage();
count++;
@@ -347,20 +382,15 @@
public synchronized AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages);
+ return getNextMessage(_messages, null);
}
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
- {
- return getNextMessage(messages, false);
- }
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && (browsing || message.taken()))
+
+ while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -371,27 +401,76 @@
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ ") from queue (" + System.identityHashCode(messageQueue) +
+ ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ }
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ }
+ return;
+ }
+
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub.isBrowser());
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+
+ if (messageQueue == sub.getResendQueue())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("All messages sent from resendQueue for " + sub);
+ }
+ if (messageQueue.isEmpty())
+ {
+ subscriberHasPendingResend(false, sub, null);
+ //better to use the above method as this keeps all the tracking in one location.
+// _hasContent.remove(sub);
+ }
+
+ _extraMessages.decrementAndGet();
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ _log.info("We could do clean up of the main _message queue here");
+ }
+
_totalMessageSize.addAndGet(-message.getSize());
}
catch (AMQException e)
@@ -403,6 +482,7 @@
/**
* enqueues the messages in the list on the queue and all required predelivery queues
+ *
* @param storeContext
* @param movedMessageList
*/
@@ -411,7 +491,7 @@
_lock.lock();
for (AMQMessage msg : movedMessageList)
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, true);
}
// enqueue on the pre delivery queues
@@ -422,7 +502,7 @@
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, true);
}
}
}
@@ -430,8 +510,8 @@
}
/**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
+ * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
+ * deliver().
*/
private void processQueue()
{
@@ -444,40 +524,43 @@
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended())
+ synchronized (sub.getSendLock())
{
- sendNextMessage(sub);
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub, _queue);
- hasSubscribers = true;
+ hasSubscribers = true;
+ }
}
}
}
}
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
- }
- }
- }
- else
- {
- sendNextMessage(sub, _messages);
- }
- }
+// private void sendNextMessage(Subscription sub)
+// {
+// if (sub.hasFilters())
+// {
+// sendNextMessage(sub, sub.getPreDeliveryQueue());
+// if (sub.isAutoClose())
+// {
+// if (sub.getPreDeliveryQueue().isEmpty())
+// {
+// sub.close();
+// }
+// }
+// }
+// else
+// {
+// sendNextMessage(sub, _messages);
+// }
+// }
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
+ public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + msg);
+ _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
}
msg.release();
@@ -491,11 +574,11 @@
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -504,7 +587,7 @@
if (_log.isDebugEnabled())
{
_log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to.");
+ " subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
@@ -528,7 +611,7 @@
_log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, deliverFirst);
}
}
}
@@ -537,14 +620,47 @@
{
//release lock now
_lock.unlock();
-
- if (_log.isDebugEnabled())
+ synchronized (s.getSendLock())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
+ }
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
+ }
+ }
}
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
@@ -593,9 +709,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.debug("Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -606,6 +720,18 @@
executor.execute(asyncDelivery);
}
}
+ }
+
+ private String currentStatus()
+ {
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
+ "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get() +
+ " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
}
}