You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/26 18:34:40 UTC
svn commit: r1771508 - in
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src:
main/java/org/apache/qpid/server/protocol/v0_8/
test/java/org/apache/qpid/server/protocol/v0_8/
Author: rgodfrey
Date: Sat Nov 26 18:34:40 2016
New Revision: 1771508
URL: http://svn.apache.org/viewvc?rev=1771508&view=rev
Log:
QPID-7425 : Properly handle messages delivered due to basic-get, improve handling of ensuring that released messages were still held by the same consumer
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Nov 26 18:34:40 2016
@@ -299,9 +299,9 @@ public class AMQChannel
if (acks)
{
- target = ConsumerTarget_0_8.createAckTarget(this,
- AMQShortString.EMPTY_STRING, null,
- INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
+ target = ConsumerTarget_0_8.createGetAckTarget(this,
+ AMQShortString.EMPTY_STRING, null,
+ INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
else
{
@@ -935,13 +935,16 @@ public class AMQChannel
/**
* Add a message to the channel-based list of unacknowledged messages
- *
- * @param entry the record of the message on the queue that was delivered
+ * @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
* @param consumer The consumer that is to acknowledge this message.
+ * @param usesCredit
*/
- public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer)
+ public void addUnacknowledgedMessage(MessageInstance entry,
+ long deliveryTag,
+ ConsumerImpl consumer,
+ final boolean usesCredit)
{
if (_logger.isDebugEnabled())
{
@@ -949,7 +952,7 @@ public class AMQChannel
+ ") for " + consumer + " on " + entry.getOwningResource().getName());
}
- _unacknowledgedMessageMap.add(deliveryTag, entry, (ConsumerTarget_0_8) consumer.getTarget());
+ _unacknowledgedMessageMap.add(deliveryTag, entry, consumer, usesCredit);
}
@@ -1015,15 +1018,17 @@ public class AMQChannel
*/
public void requeue(long deliveryTag)
{
- MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag, true);
- if (unacked != null)
+ final UnacknowledgedMessageMap.Entry entry = _unacknowledgedMessageMap.remove(deliveryTag, true);
+
+ if (entry != null)
{
+ MessageInstance unacked = entry.getMessageInstance();
// Mark message redelivered
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release(unacked.getAcquiringConsumer());
+ unacked.release(entry.getConsumer());
}
else
{
@@ -1765,19 +1770,21 @@ public class AMQChannel
private void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag, true);
+ final UnacknowledgedMessageMap.Entry entry = unackedMap.remove(deliveryTag, true);
- if (rejectedQueueEntry == null)
+ if (entry == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
}
else
{
- final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+ final MessageInstance messageInstance = entry.getMessageInstance();
+ final ServerMessage msg = messageInstance.getMessage();
int requeues = 0;
- if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
+ if (messageInstance.makeAcquisitionUnstealable(entry.getConsumer()))
{
- requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+ requeues = messageInstance.routeToAlternate(new Action<MessageInstance>()
{
@Override
public void performAction(final MessageInstance requeueEntry)
@@ -1792,7 +1799,7 @@ public class AMQChannel
if(requeues == 0)
{
- final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+ final TransactionLogResource owningResource = messageInstance.getOwningResource();
if(owningResource instanceof Queue)
{
final Queue<?> queue = (Queue<?>) owningResource;
@@ -2566,19 +2573,19 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
}
- Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>();
+ Map<Long, UnacknowledgedMessageMap.Entry> nackedMessageMap = new LinkedHashMap<>();
_unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
- for(MessageInstance message : nackedMessageMap.values())
+ for(UnacknowledgedMessageMap.Entry unackedEntry : nackedMessageMap.values())
{
- if (message == null)
+ if (unackedEntry == null)
{
_logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
}
else
{
-
+ MessageInstance message = unackedEntry.getMessageInstance();
if (message.getMessage() == null)
{
_logger.warn("Message has already been purged, unable to nack.");
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sat Nov 26 18:34:40 2016
@@ -209,27 +209,31 @@ public abstract class ConsumerTarget_0_8
consumerTag,
filters, creditManager,
channel.getClientDeliveryMethod(),
- multiQueue);
+ multiQueue, true);
}
- public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager_0_8 creditManager,
- ClientDeliveryMethod deliveryMethod)
+ public static ConsumerTarget_0_8 createGetAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager_0_8 creditManager,
+ ClientDeliveryMethod deliveryMethod)
{
- return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false);
+ return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false, false);
}
static final class AckConsumer extends ConsumerTarget_0_8
{
+ private final boolean _usesCredit;
+
public AckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager_0_8 creditManager,
ClientDeliveryMethod deliveryMethod,
- boolean multiQueue)
+ boolean multiQueue,
+ final boolean usesCredit)
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, multiQueue);
+ _usesCredit = usesCredit;
}
/**
@@ -252,7 +256,7 @@ public abstract class ConsumerTarget_0_8
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
- getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer);
+ getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer, _usesCredit);
long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Sat Nov 26 18:34:40 2016
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import java.util.Map;
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -42,16 +42,25 @@ public interface UnacknowledgedMessageMa
void visit(Visitor visitor);
- void add(long deliveryTag, MessageInstance message, final ConsumerTarget target);
+ void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
- MessageInstance remove(long deliveryTag, final boolean restoreCredit);
+ Entry remove(long deliveryTag, final boolean restoreCredit);
int size();
MessageInstance get(long deliveryTag);
Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
- void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
+ void collect(long key, boolean multiple, Map<Long, Entry> msgs);
+
+ interface Entry
+ {
+ MessageInstance getMessageInstance();
+
+ ConsumerImpl getConsumer();
+
+ long getSize();
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Sat Nov 26 18:34:40 2016
@@ -27,39 +27,49 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private static final class MessageTargetPair
+ private static final class EntryImpl implements Entry
{
private final MessageInstance _messageInstance;
- private final ConsumerTarget _target;
+ private final ConsumerImpl _consumer;
+ private final boolean _usesCredit;
- private MessageTargetPair(final MessageInstance messageInstance, final ConsumerTarget target)
+ private EntryImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
{
_messageInstance = messageInstance;
- _target = target;
+ _consumer = consumer;
+ _usesCredit = usesCredit;
}
+ @Override
public MessageInstance getMessageInstance()
{
return _messageInstance;
}
- public ConsumerTarget getTarget()
+ @Override
+ public ConsumerImpl getConsumer()
{
- return _target;
+ return _consumer;
}
+ @Override
public long getSize()
{
return _messageInstance.getMessage().getSize();
}
+
+ public boolean isUsesCredit()
+ {
+ return _usesCredit;
+ }
}
- private final Map<Long, MessageTargetPair> _map;
+ private final Map<Long, EntryImpl> _map;
// we keep this separately as it is accessed by the management thread
private volatile int _size;
@@ -71,7 +81,7 @@ class UnacknowledgedMessageMapImpl imple
_creditRestorer = creditRestorer;
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, Entry> msgs)
{
if (multiple)
{
@@ -79,7 +89,7 @@ class UnacknowledgedMessageMapImpl imple
}
else
{
- final MessageInstance entry = get(deliveryTag);
+ final Entry entry = _map.get(deliveryTag);
if(entry != null)
{
msgs.put(deliveryTag, entry);
@@ -88,40 +98,40 @@ class UnacknowledgedMessageMapImpl imple
}
- public void remove(Map<Long,MessageInstance> msgs)
+ private void remove(Collection<Long> msgs)
{
- for (Long deliveryTag : msgs.keySet())
+ for (Long deliveryTag : msgs)
{
remove(deliveryTag, true);
}
}
- public MessageInstance remove(long deliveryTag, final boolean restoreCredit)
+ public Entry remove(long deliveryTag, final boolean restoreCredit)
{
- MessageTargetPair message = _map.remove(deliveryTag);
- if(message != null)
+ EntryImpl entry = _map.remove(deliveryTag);
+ if(entry != null)
{
_size--;
- if(restoreCredit)
+ if(restoreCredit && entry.isUsesCredit())
{
- _creditRestorer.restoreCredit(message.getTarget(), 1, message.getSize());
+ _creditRestorer.restoreCredit(entry.getConsumer().getTarget(), 1, entry.getSize());
}
}
- return message.getMessageInstance();
+ return entry;
}
public void visit(Visitor visitor)
{
- for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
+ for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
{
visitor.callback(entry.getKey(), entry.getValue().getMessageInstance());
}
visitor.visitComplete();
}
- public void add(long deliveryTag, MessageInstance message, final ConsumerTarget target)
+ public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
{
- if(_map.put(deliveryTag, new MessageTargetPair(message,target)) == null)
+ if(_map.put(deliveryTag, new EntryImpl(message, consumer, usesCredit)) == null)
{
_size++;
}
@@ -138,7 +148,7 @@ class UnacknowledgedMessageMapImpl imple
public MessageInstance get(long key)
{
- MessageTargetPair messageTargetPair = _map.get(key);
+ Entry messageTargetPair = _map.get(key);
return messageTargetPair == null ? null : messageTargetPair.getMessageInstance();
}
@@ -146,13 +156,14 @@ class UnacknowledgedMessageMapImpl imple
{
if(multiple)
{
- Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<>();
+ Map<Long, Entry> ackedMessageMap = new LinkedHashMap<>();
collect(deliveryTag, multiple, ackedMessageMap);
- remove(ackedMessageMap);
+ remove(ackedMessageMap.keySet());
List<MessageInstance> acknowledged = new ArrayList<>();
- for (MessageInstance instance : ackedMessageMap.values())
+ for (Entry entry : ackedMessageMap.values())
{
- if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
+ MessageInstance instance = entry.getMessageInstance();
+ if (instance.makeAcquisitionUnstealable(entry.getConsumer()))
{
acknowledged.add(instance);
}
@@ -162,8 +173,8 @@ class UnacknowledgedMessageMapImpl imple
else
{
MessageInstance instance;
- instance = remove(deliveryTag, true);
- if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
+ final Entry entry = remove(deliveryTag, true);
+ if(entry != null && (instance = entry.getMessageInstance()).makeAcquisitionUnstealable(entry.getConsumer()))
{
return Collections.singleton(instance);
}
@@ -175,11 +186,11 @@ class UnacknowledgedMessageMapImpl imple
}
}
- private void collect(long key, Map<Long, MessageInstance> msgs)
+ private void collect(long key, Map<Long, Entry> msgs)
{
- for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
+ for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
{
- msgs.put(entry.getKey(), entry.getValue().getMessageInstance());
+ msgs.put(entry.getKey(), entry.getValue());
if (entry.getKey() == key)
{
break;
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Sat Nov 26 18:34:40 2016
@@ -33,7 +33,6 @@ import org.mockito.stubbing.Answer;
import org.apache.qpid.QpidException;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
@@ -77,7 +76,7 @@ public class ExtractResendAndRequeueTest
when(_queue.isDeleted()).thenReturn(_queueDeleted);
_consumer = mock(ConsumerImpl.class);
when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
- ConsumerTarget target = mock(ConsumerTarget.class);
+ ConsumerImpl consumer = mock(ConsumerImpl.class);
long id = 0;
@@ -100,7 +99,7 @@ public class ExtractResendAndRequeueTest
}
}).when(entry).delete();
- _unacknowledgedMessageMap.add(id, entry, target);
+ _unacknowledgedMessageMap.add(id, entry, consumer, true);
_referenceList.add(entry);
//Increment ID;
id++;
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Sat Nov 26 18:34:40 2016
@@ -73,7 +73,7 @@ public class UnacknowledgedMessageMapTes
for(int i = 0; i < size; i++)
{
msgs[i] = createMessageInstance(i);
- map.add((long)i, msgs[i], null);
+ map.add((long)i, msgs[i], _consumer, true);
}
return msgs;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org