You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/11/29 14:53:02 UTC
svn commit: r1771912 - in
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src:
main/java/org/apache/qpid/server/protocol/v0_8/
test/java/org/apache/qpid/server/protocol/v0_8/
Author: kwall
Date: Tue Nov 29 14:53:02 2016
New Revision: 1771912
URL: http://svn.apache.org/viewvc?rev=1771912&view=rev
Log:
QPID-7425: [Java Broker] Resend message only if message can be made unsteallable for the current consumer
Added:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
- copied, changed from r1771878, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
Removed:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Nov 29 14:53:02 2016
@@ -46,6 +46,8 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +86,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
+import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageHandle;
@@ -111,7 +114,16 @@ public class AMQChannel
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class);
- public static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
+ private static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
+ private static final Function<MessageConsumerAssociation, MessageInstance>
+ MESSAGE_INSTANCE_FUNCTION = new Function<MessageConsumerAssociation, MessageInstance>()
+ {
+ @Override
+ public MessageInstance apply(final MessageConsumerAssociation input)
+ {
+ return input.getMessageInstance();
+ }
+ };
private final DefaultQueueAssociationClearingTask
_defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
@@ -178,7 +190,7 @@ public class AMQChannel
private LogSubject _logSubject;
private volatile boolean _rollingBack;
- private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
+ private List<MessageConsumerAssociation> _resendList = new ArrayList<>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
@@ -970,14 +982,13 @@ public class AMQChannel
*/
private void requeue()
{
- final Map<Long, MessageInstance> unackedMapCopy = new LinkedHashMap<>();
- // we must create a new map since all the messages will get a new delivery tag when they are redelivered
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ final Map<Long, MessageConsumerAssociation> copy = new LinkedHashMap<>();
+ _unacknowledgedMessageMap.visit(new Visitor()
{
@Override
- public boolean callback(final long deliveryTag, final MessageInstance message)
+ public boolean callback(final long deliveryTag, final MessageConsumerAssociation messageConsumerPair)
{
- unackedMapCopy.put(deliveryTag, message);
+ copy.put(deliveryTag, messageConsumerPair);
return false;
}
@@ -988,24 +999,24 @@ public class AMQChannel
}
});
- if (!unackedMapCopy.isEmpty())
+ if (!copy.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Requeuing " + unackedMapCopy.size() + " unacked messages. for " + toString());
+ _logger.debug("Requeuing {} unacked messages", copy.size());
}
-
}
- for (Map.Entry<Long,MessageInstance> entry : unackedMapCopy.entrySet())
+ for (Map.Entry<Long, MessageConsumerAssociation> entry : copy.entrySet())
{
- MessageInstance unacked = entry.getValue();
+ MessageInstance unacked = entry.getValue().getMessageInstance();
+ ConsumerImpl consumer = entry.getValue().getConsumer();
// Mark message redelivered
unacked.setRedelivered();
// here we wish to restore credit
_unacknowledgedMessageMap.remove(entry.getKey(), true);
// Ensure message is released for redelivery
- unacked.release(unacked.getAcquiringConsumer());
+ unacked.release(consumer);
}
}
@@ -1019,22 +1030,20 @@ public class AMQChannel
public void requeue(long deliveryTag)
{
- final UnacknowledgedMessageMap.Entry entry = _unacknowledgedMessageMap.remove(deliveryTag, true);
+ final MessageConsumerAssociation association = _unacknowledgedMessageMap.remove(deliveryTag, true);
- if (entry != null)
+ if (association != null)
{
- MessageInstance unacked = entry.getMessageInstance();
+ MessageInstance unacked = association.getMessageInstance();
// Mark message redelivered
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release(entry.getConsumer());
+ unacked.release(association.getConsumer());
}
else
{
- _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
- + _unacknowledgedMessageMap.size());
-
+ _logger.warn("Requested requeue of message: {} but no such delivery tag exists.", deliveryTag);
}
}
@@ -1064,97 +1073,76 @@ public class AMQChannel
return false;
}
- private boolean resendInternal(MessageInstance messageInstance)
- {
- ConsumerImpl subscriber = messageInstance.getDeliveredConsumer();
- if (subscriber != null && subscriber.getSessionModel() == this)
- {
- ConsumerTarget target = subscriber.getTarget();
- if (target.getState() != ConsumerTarget.State.CLOSED)
- {
- target.send(subscriber, messageInstance, false);
- return true;
- }
-
- }
- return false;
- }
-
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
*/
private void resend()
{
-
-
- final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
- final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageConsumerAssociation> msgToRequeue = new LinkedHashMap<>();
+ final Map<Long, MessageConsumerAssociation> msgToResend = new LinkedHashMap<>();
if (_logger.isDebugEnabled())
{
- _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
+ _logger.debug("Unacknowledged messages: {}", _unacknowledgedMessageMap.size());
}
- // Process the Unacked-Map.
- // Marking messages who still have a consumer for to be resent
- // and those that don't to be requeued.
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
- msgToRequeue,
- msgToResend
- ));
-
-
- // Process Messages to Resend
- if (_logger.isDebugEnabled())
+ _unacknowledgedMessageMap.visit(new Visitor()
{
- if (!msgToResend.isEmpty())
+ @Override
+ public boolean callback(final long deliveryTag, final MessageConsumerAssociation association)
{
- _logger.debug("Preparing (" + msgToResend.size() + ") message to resend.");
+
+ if (association.getConsumer().isClosed())
+ {
+ // consumer has gone
+ msgToRequeue.put(deliveryTag, association);
+ }
+ else
+ {
+ // Consumer still exists
+ msgToResend.put(deliveryTag, association);
+ }
+ return false;
}
- else
+
+ @Override
+ public void visitComplete()
{
- _logger.debug("No message to resend.");
}
- }
+ });
- for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
+
+ for (Map.Entry<Long, MessageConsumerAssociation> entry : msgToResend.entrySet())
{
- MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
-
- //Amend the delivery counter as the client hasn't seen these messages yet.
- message.decrementDeliveryCount();
+ MessageInstance message = entry.getValue().getMessageInstance();
+ ConsumerImpl consumer = entry.getValue().getConsumer();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
message.setRedelivered();
- if (resendInternal(message))
+ if (message.makeAcquisitionUnstealable(consumer))
{
+ message.decrementDeliveryCount();
+
+ consumer.getTarget().send(consumer, message, false);
// remove from unacked map - don't want to restore credit though(!)
_unacknowledgedMessageMap.remove(deliveryTag, false);
}
else
{
- msgToRequeue.put(deliveryTag, message);
- }
- } // for all messages
- // } else !isSuspend
-
- if (_logger.isDebugEnabled())
- {
- if (!msgToRequeue.isEmpty())
- {
- _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue");
+ msgToRequeue.put(deliveryTag, entry.getValue());
}
}
// Process Messages to Requeue at the front of the queue
- for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
+ for (Map.Entry<Long, MessageConsumerAssociation> entry : msgToRequeue.entrySet())
{
- MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
+ MessageInstance message = entry.getValue().getMessageInstance();
+ ConsumerImpl consumer = entry.getValue().getConsumer();
//Amend the delivery counter as the client hasn't seen these messages yet.
message.decrementDeliveryCount();
@@ -1163,18 +1151,12 @@ public class AMQChannel
_unacknowledgedMessageMap.remove(deliveryTag, true);
message.setRedelivered();
- message.release(message.getAcquiringConsumer());
-
+ message.release(consumer);
}
}
- /**
- * Used only for testing purposes.
- *
- * @return the map of unacknowledged messages
- */
- public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
+ private UnacknowledgedMessageMap getUnacknowledgedMessageMap()
{
return _unacknowledgedMessageMap;
}
@@ -1222,13 +1204,7 @@ public class AMQChannel
}
}
- public boolean isSuspended()
- {
- return _suspended.get() || _closing.get() || _connection.isClosing();
- }
-
-
- public void commit(final Runnable immediateAction, boolean async)
+ private void commit(final Runnable immediateAction, boolean async)
{
@@ -1294,16 +1270,17 @@ public class AMQChannel
postRollbackTask.run();
- for(MessageInstance entry : _resendList)
+ for(MessageConsumerAssociation association : _resendList)
{
- ConsumerImpl sub = entry.getAcquiringConsumer();
- if (sub == null || sub.isClosed())
+ final MessageInstance messageInstance = association.getMessageInstance();
+ final ConsumerImpl consumer = association.getConsumer();
+ if (consumer.isClosed())
{
- entry.release(sub);
+ messageInstance.release(consumer);
}
else
{
- resendInternal(entry);
+ consumer.getTarget().send(consumer, messageInstance, false);
}
}
_resendList.clear();
@@ -1579,9 +1556,9 @@ public class AMQChannel
private class MessageAcknowledgeAction implements ServerTransaction.Action
{
- private Collection<MessageInstance> _ackedMessages;
+ private Collection<MessageConsumerAssociation> _ackedMessages;
- public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
+ public MessageAcknowledgeAction(Collection<MessageConsumerAssociation> ackedMessages)
{
_ackedMessages = ackedMessages;
}
@@ -1590,9 +1567,9 @@ public class AMQChannel
{
try
{
- for(MessageInstance entry : _ackedMessages)
+ for(MessageConsumerAssociation association : _ackedMessages)
{
- entry.delete();
+ association.getMessageInstance().delete();
}
}
finally
@@ -1607,9 +1584,9 @@ public class AMQChannel
// explicit rollbacks resend the message after the rollback-ok is sent
if(_rollingBack)
{
- for(MessageInstance entry : _ackedMessages)
+ for(MessageConsumerAssociation association : _ackedMessages)
{
- entry.makeAcquisitionStealable();
+ association.getMessageInstance().makeAcquisitionStealable();
}
_resendList.addAll(_ackedMessages);
}
@@ -1617,9 +1594,10 @@ public class AMQChannel
{
try
{
- for(MessageInstance entry : _ackedMessages)
+ for(MessageConsumerAssociation association : _ackedMessages)
{
- entry.release(entry.getAcquiringConsumer());
+ final MessageInstance messageInstance = association.getMessageInstance();
+ messageInstance.release(association.getConsumer());
}
}
finally
@@ -1770,19 +1748,19 @@ public class AMQChannel
private void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final UnacknowledgedMessageMap.Entry entry = unackedMap.remove(deliveryTag, true);
+ final MessageConsumerAssociation association = unackedMap.remove(deliveryTag, true);
- if (entry == null)
+ if (association == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
}
else
{
- final MessageInstance messageInstance = entry.getMessageInstance();
+ final MessageInstance messageInstance = association.getMessageInstance();
final ServerMessage msg = messageInstance.getMessage();
int requeues = 0;
- if (messageInstance.makeAcquisitionUnstealable(entry.getConsumer()))
+ if (messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
{
requeues = messageInstance.routeToAlternate(new Action<MessageInstance>()
{
@@ -2046,8 +2024,9 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
}
- Collection<MessageInstance> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
- _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+ Collection<MessageConsumerAssociation> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+ final Collection<MessageInstance> messages = Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
+ _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
}
@Override
@@ -2573,19 +2552,19 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
}
- Map<Long, UnacknowledgedMessageMap.Entry> nackedMessageMap = new LinkedHashMap<>();
+ Map<Long, MessageConsumerAssociation> nackedMessageMap = new LinkedHashMap<>();
_unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
- for(UnacknowledgedMessageMap.Entry unackedEntry : nackedMessageMap.values())
+ for(MessageConsumerAssociation unackedMessageConsumerAssociation : nackedMessageMap.values())
{
- if (unackedEntry == null)
+ if (unackedMessageConsumerAssociation == null)
{
_logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
}
else
{
- MessageInstance message = unackedEntry.getMessageInstance();
+ MessageInstance message = unackedMessageConsumerAssociation.getMessageInstance();
if (message.getMessage() == null)
{
_logger.warn("Message has already been purged, unable to nack.");
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Tue Nov 29 14:53:02 2016
@@ -43,7 +43,6 @@ public class AMQMessage extends Abstract
public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference)
{
super(handle, connectionReference, handle.getMetaData().getContentSize());
- ;
}
public MessageMetaData getMessageMetaData()
Copied: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java (from r1771878, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java?p2=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java&r1=1771878&r2=1771912&rev=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java Tue Nov 29 14:53:02 2016
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,49 +17,17 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol.v0_8;
-import java.util.Collection;
-import java.util.Map;
+package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-
-public interface UnacknowledgedMessageMap
+public interface MessageConsumerAssociation
{
- interface Visitor
- {
- /**
- * @param deliveryTag
- * @param message the message being iterated over @return true to stop iteration, false to continue
- */
- boolean callback(final long deliveryTag, MessageInstance message);
-
- void visitComplete();
- }
-
- void visit(Visitor visitor);
-
- void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
+ MessageInstance getMessageInstance();
- Entry remove(long deliveryTag, final boolean restoreCredit);
+ ConsumerImpl getConsumer();
- int size();
-
- MessageInstance get(long deliveryTag);
-
- Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
- void collect(long key, boolean multiple, Map<Long, Entry> msgs);
-
- interface Entry
- {
- MessageInstance getMessageInstance();
-
- ConsumerImpl getConsumer();
-
- long getSize();
- }
+ long getSize();
}
-
-
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Tue Nov 29 14:53:02 2016
@@ -31,11 +31,7 @@ public interface UnacknowledgedMessageMa
{
interface Visitor
{
- /**
- * @param deliveryTag
- * @param message the message being iterated over @return true to stop iteration, false to continue
- */
- boolean callback(final long deliveryTag, MessageInstance message);
+ boolean callback(final long deliveryTag, final MessageConsumerAssociation messageConsumerPair);
void visitComplete();
}
@@ -44,23 +40,14 @@ public interface UnacknowledgedMessageMa
void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
- Entry remove(long deliveryTag, final boolean restoreCredit);
+ MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit);
int size();
MessageInstance get(long deliveryTag);
- Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
- void collect(long key, boolean multiple, Map<Long, Entry> msgs);
-
- interface Entry
- {
- MessageInstance getMessageInstance();
-
- ConsumerImpl getConsumer();
-
- long getSize();
- }
+ Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple);
+ void collect(long key, boolean multiple, Map<Long, MessageConsumerAssociation> msgs);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Tue Nov 29 14:53:02 2016
@@ -33,13 +33,13 @@ import org.apache.qpid.server.util.Conne
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private static final class EntryImpl implements Entry
+ private static final class MessageConsumerAssociationImpl implements MessageConsumerAssociation
{
private final MessageInstance _messageInstance;
private final ConsumerImpl _consumer;
private final boolean _usesCredit;
- private EntryImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
+ private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
{
_messageInstance = messageInstance;
_consumer = consumer;
@@ -69,11 +69,11 @@ class UnacknowledgedMessageMapImpl imple
return _usesCredit;
}
}
- private final Map<Long, EntryImpl> _map;
+ private final Map<Long, MessageConsumerAssociationImpl> _map;
// we keep this separately as it is accessed by the management thread
private volatile int _size;
- private CreditRestorer _creditRestorer;
+ private final CreditRestorer _creditRestorer;
UnacknowledgedMessageMapImpl(int prefetchLimit, CreditRestorer creditRestorer)
{
@@ -81,7 +81,7 @@ class UnacknowledgedMessageMapImpl imple
_creditRestorer = creditRestorer;
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, Entry> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, MessageConsumerAssociation> msgs)
{
if (multiple)
{
@@ -89,10 +89,10 @@ class UnacknowledgedMessageMapImpl imple
}
else
{
- final Entry entry = _map.get(deliveryTag);
- if(entry != null)
+ final MessageConsumerAssociation messageConsumerAssociation = _map.get(deliveryTag);
+ if(messageConsumerAssociation != null)
{
- msgs.put(deliveryTag, entry);
+ msgs.put(deliveryTag, messageConsumerAssociation);
}
}
@@ -106,9 +106,9 @@ class UnacknowledgedMessageMapImpl imple
}
}
- public Entry remove(long deliveryTag, final boolean restoreCredit)
+ public MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit)
{
- EntryImpl entry = _map.remove(deliveryTag);
+ MessageConsumerAssociationImpl entry = _map.remove(deliveryTag);
if(entry != null)
{
_size--;
@@ -122,16 +122,16 @@ class UnacknowledgedMessageMapImpl imple
public void visit(Visitor visitor)
{
- for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
{
- visitor.callback(entry.getKey(), entry.getValue().getMessageInstance());
+ visitor.callback(entry.getKey(), entry.getValue());
}
visitor.visitComplete();
}
public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
{
- if(_map.put(deliveryTag, new EntryImpl(message, consumer, usesCredit)) == null)
+ if(_map.put(deliveryTag, new MessageConsumerAssociationImpl(message, consumer, usesCredit)) == null)
{
_size++;
}
@@ -148,35 +148,35 @@ class UnacknowledgedMessageMapImpl imple
public MessageInstance get(long key)
{
- Entry messageTargetPair = _map.get(key);
- return messageTargetPair == null ? null : messageTargetPair.getMessageInstance();
+ MessageConsumerAssociation association = _map.get(key);
+ return association == null ? null : association.getMessageInstance();
}
- public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
+ public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple)
{
if(multiple)
{
- Map<Long, Entry> ackedMessageMap = new LinkedHashMap<>();
+ Map<Long, MessageConsumerAssociation> ackedMessageMap = new LinkedHashMap<>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap.keySet());
- List<MessageInstance> acknowledged = new ArrayList<>();
- for (Entry entry : ackedMessageMap.values())
+ List<MessageConsumerAssociation> acknowledged = new ArrayList<>();
+ for (MessageConsumerAssociation messageConsumerAssociation : ackedMessageMap.values())
{
- MessageInstance instance = entry.getMessageInstance();
- if (instance.makeAcquisitionUnstealable(entry.getConsumer()))
+ MessageInstance instance = messageConsumerAssociation.getMessageInstance();
+ if (instance.makeAcquisitionUnstealable(messageConsumerAssociation.getConsumer()))
{
- acknowledged.add(instance);
+ acknowledged.add(messageConsumerAssociation);
}
}
return acknowledged;
}
else
{
- MessageInstance instance;
- final Entry entry = remove(deliveryTag, true);
- if(entry != null && (instance = entry.getMessageInstance()).makeAcquisitionUnstealable(entry.getConsumer()))
+ final MessageConsumerAssociation association = remove(deliveryTag, true);
+ final MessageInstance messageInstance = association.getMessageInstance();
+ if(association != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
{
- return Collections.singleton(instance);
+ return Collections.singleton(association);
}
else
{
@@ -186,9 +186,9 @@ class UnacknowledgedMessageMapImpl imple
}
}
- private void collect(long key, Map<Long, Entry> msgs)
+ private void collect(long key, Map<Long, MessageConsumerAssociation> msgs)
{
- for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
{
msgs.put(entry.getKey(), entry.getValue());
if (entry.getKey() == key)
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Tue Nov 29 14:53:02 2016
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.when;
import java.util.Collection;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -32,6 +35,16 @@ import org.apache.qpid.test.utils.QpidTe
public class UnacknowledgedMessageMapTest extends QpidTestCase
{
+ public static final Function<MessageConsumerAssociation, MessageInstance>
+ MESSAGE_INSTANCE_FUNCTION = new Function<MessageConsumerAssociation, MessageInstance>()
+
+ {
+ @Override
+ public MessageInstance apply(final MessageConsumerAssociation input)
+ {
+ return input.getMessageInstance();
+ }
+ };
private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
public void testDeletedMessagesCantBeAcknowledged()
@@ -40,12 +53,13 @@ public class UnacknowledgedMessageMapTes
final int expectedSize = 5;
MessageInstance[] msgs = populateMap(map,expectedSize);
assertEquals(expectedSize,map.size());
- Collection<MessageInstance> acknowledged = map.acknowledge(100, true);
+ Collection<MessageConsumerAssociation> acknowledged = map.acknowledge(100, true);
+ Collection<MessageInstance> acknowledgedMessages = Collections2.transform(acknowledged, MESSAGE_INSTANCE_FUNCTION);
assertEquals(expectedSize, acknowledged.size());
assertEquals(0,map.size());
for(int i = 0; i < expectedSize; i++)
{
- assertTrue("Message " + i + " is missing", acknowledged.contains(msgs[i]));
+ assertTrue("Message " + i + " is missing", acknowledgedMessages.contains(msgs[i]));
}
map = new UnacknowledgedMessageMapImpl(100, mock(CreditRestorer.class));
@@ -58,11 +72,12 @@ public class UnacknowledgedMessageMapTes
acknowledged = map.acknowledge(100, true);
+ acknowledgedMessages = Collections2.transform(acknowledged, MESSAGE_INSTANCE_FUNCTION);
assertEquals(expectedSize-2, acknowledged.size());
assertEquals(0,map.size());
for(int i = 0; i < expectedSize; i++)
{
- assertEquals(i != 2 && i != 4, acknowledged.contains(msgs[i]));
+ assertEquals(i != 2 && i != 4, acknowledgedMessages.contains(msgs[i]));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org