You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/11/29 14:53:02 UTC

svn commit: r1771912 - in /qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src: main/java/org/apache/qpid/server/protocol/v0_8/ test/java/org/apache/qpid/server/protocol/v0_8/

Author: kwall
Date: Tue Nov 29 14:53:02 2016
New Revision: 1771912

URL: http://svn.apache.org/viewvc?rev=1771912&view=rev
Log:
QPID-7425: [Java Broker] Resend message only if message can be made unsteallable for the current consumer

Added:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
      - copied, changed from r1771878, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
Removed:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Nov 29 14:53:02 2016
@@ -46,6 +46,8 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,6 +86,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
+import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageHandle;
@@ -111,7 +114,16 @@ public class AMQChannel
     public static final int DEFAULT_PREFETCH = 4096;
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class);
-    public static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
+    private static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
+    private static final Function<MessageConsumerAssociation, MessageInstance>
+            MESSAGE_INSTANCE_FUNCTION = new Function<MessageConsumerAssociation, MessageInstance>()
+    {
+        @Override
+        public MessageInstance apply(final MessageConsumerAssociation input)
+        {
+            return input.getMessageInstance();
+        }
+    };
     private final DefaultQueueAssociationClearingTask
             _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
 
@@ -178,7 +190,7 @@ public class AMQChannel
     private LogSubject _logSubject;
     private volatile boolean _rollingBack;
 
-    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
+    private List<MessageConsumerAssociation> _resendList = new ArrayList<>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
 
@@ -970,14 +982,13 @@ public class AMQChannel
      */
     private void requeue()
     {
-        final Map<Long, MessageInstance> unackedMapCopy = new LinkedHashMap<>();
-        // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        final Map<Long, MessageConsumerAssociation> copy = new LinkedHashMap<>();
+        _unacknowledgedMessageMap.visit(new Visitor()
         {
             @Override
-            public boolean callback(final long deliveryTag, final MessageInstance message)
+            public boolean callback(final long deliveryTag, final MessageConsumerAssociation messageConsumerPair)
             {
-                unackedMapCopy.put(deliveryTag, message);
+                copy.put(deliveryTag, messageConsumerPair);
                 return false;
             }
 
@@ -988,24 +999,24 @@ public class AMQChannel
             }
         });
 
-        if (!unackedMapCopy.isEmpty())
+        if (!copy.isEmpty())
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Requeuing " + unackedMapCopy.size() + " unacked messages. for " + toString());
+                _logger.debug("Requeuing {} unacked messages", copy.size());
             }
-
         }
 
-        for (Map.Entry<Long,MessageInstance> entry : unackedMapCopy.entrySet())
+        for (Map.Entry<Long, MessageConsumerAssociation> entry : copy.entrySet())
         {
-            MessageInstance unacked = entry.getValue();
+            MessageInstance unacked = entry.getValue().getMessageInstance();
+            ConsumerImpl consumer = entry.getValue().getConsumer();
             // Mark message redelivered
             unacked.setRedelivered();
             // here we wish to restore credit
             _unacknowledgedMessageMap.remove(entry.getKey(), true);
             // Ensure message is released for redelivery
-            unacked.release(unacked.getAcquiringConsumer());
+            unacked.release(consumer);
         }
 
     }
@@ -1019,22 +1030,20 @@ public class AMQChannel
     public void requeue(long deliveryTag)
     {
 
-        final UnacknowledgedMessageMap.Entry entry = _unacknowledgedMessageMap.remove(deliveryTag, true);
+        final MessageConsumerAssociation association = _unacknowledgedMessageMap.remove(deliveryTag, true);
 
-        if (entry != null)
+        if (association != null)
         {
-            MessageInstance unacked = entry.getMessageInstance();
+            MessageInstance unacked = association.getMessageInstance();
             // Mark message redelivered
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release(entry.getConsumer());
+            unacked.release(association.getConsumer());
         }
         else
         {
-            _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
-                      + _unacknowledgedMessageMap.size());
-
+            _logger.warn("Requested requeue of message: {} but no such delivery tag exists.", deliveryTag);
         }
 
     }
@@ -1064,97 +1073,76 @@ public class AMQChannel
         return false;
     }
 
-    private boolean resendInternal(MessageInstance messageInstance)
-    {
-        ConsumerImpl subscriber = messageInstance.getDeliveredConsumer();
-        if (subscriber != null && subscriber.getSessionModel() == this)
-        {
-            ConsumerTarget target = subscriber.getTarget();
-            if (target.getState() != ConsumerTarget.State.CLOSED)
-            {
-                target.send(subscriber, messageInstance, false);
-                return true;
-            }
-
-        }
-        return false;
-    }
-
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
      */
     private void resend()
     {
-
-
-        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
-        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageConsumerAssociation> msgToRequeue = new LinkedHashMap<>();
+        final Map<Long, MessageConsumerAssociation> msgToResend = new LinkedHashMap<>();
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
+            _logger.debug("Unacknowledged messages: {}", _unacknowledgedMessageMap.size());
         }
 
-        // Process the Unacked-Map.
-        // Marking messages who still have a consumer for to be resent
-        // and those that don't to be requeued.
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
-                                                                    msgToRequeue,
-                                                                    msgToResend
-        ));
-
-
-        // Process Messages to Resend
-        if (_logger.isDebugEnabled())
+        _unacknowledgedMessageMap.visit(new Visitor()
         {
-            if (!msgToResend.isEmpty())
+            @Override
+            public boolean callback(final long deliveryTag, final MessageConsumerAssociation association)
             {
-                _logger.debug("Preparing (" + msgToResend.size() + ") message to resend.");
+
+                if (association.getConsumer().isClosed())
+                {
+                    // consumer has gone
+                    msgToRequeue.put(deliveryTag, association);
+                }
+                else
+                {
+                    // Consumer still exists
+                    msgToResend.put(deliveryTag,  association);
+                }
+                return false;
             }
-            else
+
+            @Override
+            public void visitComplete()
             {
-                _logger.debug("No message to resend.");
             }
-        }
+        });
 
-        for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
+
+        for (Map.Entry<Long, MessageConsumerAssociation> entry : msgToResend.entrySet())
         {
-            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
-
-            //Amend the delivery counter as the client hasn't seen these messages yet.
-            message.decrementDeliveryCount();
+            MessageInstance message = entry.getValue().getMessageInstance();
+            ConsumerImpl consumer = entry.getValue().getConsumer();
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             message.setRedelivered();
 
-            if (resendInternal(message))
+            if (message.makeAcquisitionUnstealable(consumer))
             {
+                message.decrementDeliveryCount();
+
+                consumer.getTarget().send(consumer, message, false);
                 // remove from unacked map - don't want to restore credit though(!)
                 _unacknowledgedMessageMap.remove(deliveryTag, false);
             }
             else
             {
-                msgToRequeue.put(deliveryTag, message);
-            }
-        } // for all messages
-        // } else !isSuspend
-
-        if (_logger.isDebugEnabled())
-        {
-            if (!msgToRequeue.isEmpty())
-            {
-                _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue");
+                msgToRequeue.put(deliveryTag, entry.getValue());
             }
         }
 
         // Process Messages to Requeue at the front of the queue
-        for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
+        for (Map.Entry<Long, MessageConsumerAssociation> entry : msgToRequeue.entrySet())
         {
-            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
+            MessageInstance message = entry.getValue().getMessageInstance();
+            ConsumerImpl consumer = entry.getValue().getConsumer();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
             message.decrementDeliveryCount();
@@ -1163,18 +1151,12 @@ public class AMQChannel
             _unacknowledgedMessageMap.remove(deliveryTag, true);
 
             message.setRedelivered();
-            message.release(message.getAcquiringConsumer());
-
+            message.release(consumer);
         }
     }
 
 
-    /**
-     * Used only for testing purposes.
-     *
-     * @return the map of unacknowledged messages
-     */
-    public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
+    private UnacknowledgedMessageMap getUnacknowledgedMessageMap()
     {
         return _unacknowledgedMessageMap;
     }
@@ -1222,13 +1204,7 @@ public class AMQChannel
         }
     }
 
-    public boolean isSuspended()
-    {
-        return _suspended.get()  || _closing.get() || _connection.isClosing();
-    }
-
-
-    public void commit(final Runnable immediateAction, boolean async)
+    private void commit(final Runnable immediateAction, boolean async)
     {
 
 
@@ -1294,16 +1270,17 @@ public class AMQChannel
 
         postRollbackTask.run();
 
-        for(MessageInstance entry : _resendList)
+        for(MessageConsumerAssociation association : _resendList)
         {
-            ConsumerImpl sub = entry.getAcquiringConsumer();
-            if (sub == null || sub.isClosed())
+            final MessageInstance messageInstance = association.getMessageInstance();
+            final ConsumerImpl consumer = association.getConsumer();
+            if (consumer.isClosed())
             {
-                entry.release(sub);
+                messageInstance.release(consumer);
             }
             else
             {
-                resendInternal(entry);
+                consumer.getTarget().send(consumer, messageInstance, false);
             }
         }
         _resendList.clear();
@@ -1579,9 +1556,9 @@ public class AMQChannel
 
     private class MessageAcknowledgeAction implements ServerTransaction.Action
     {
-        private Collection<MessageInstance> _ackedMessages;
+        private Collection<MessageConsumerAssociation> _ackedMessages;
 
-        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
+        public MessageAcknowledgeAction(Collection<MessageConsumerAssociation> ackedMessages)
         {
             _ackedMessages = ackedMessages;
         }
@@ -1590,9 +1567,9 @@ public class AMQChannel
         {
             try
             {
-                for(MessageInstance entry : _ackedMessages)
+                for(MessageConsumerAssociation association : _ackedMessages)
                 {
-                    entry.delete();
+                    association.getMessageInstance().delete();
                 }
             }
             finally
@@ -1607,9 +1584,9 @@ public class AMQChannel
             // explicit rollbacks resend the message after the rollback-ok is sent
             if(_rollingBack)
             {
-                for(MessageInstance entry : _ackedMessages)
+                for(MessageConsumerAssociation association : _ackedMessages)
                 {
-                    entry.makeAcquisitionStealable();
+                    association.getMessageInstance().makeAcquisitionStealable();
                 }
                 _resendList.addAll(_ackedMessages);
             }
@@ -1617,9 +1594,10 @@ public class AMQChannel
             {
                 try
                 {
-                    for(MessageInstance entry : _ackedMessages)
+                    for(MessageConsumerAssociation association : _ackedMessages)
                     {
-                        entry.release(entry.getAcquiringConsumer());
+                        final MessageInstance messageInstance = association.getMessageInstance();
+                        messageInstance.release(association.getConsumer());
                     }
                 }
                 finally
@@ -1770,19 +1748,19 @@ public class AMQChannel
     private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
-        final UnacknowledgedMessageMap.Entry entry = unackedMap.remove(deliveryTag, true);
+        final MessageConsumerAssociation association = unackedMap.remove(deliveryTag, true);
 
-        if (entry == null)
+        if (association == null)
         {
             _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
         }
         else
         {
 
-            final MessageInstance messageInstance = entry.getMessageInstance();
+            final MessageInstance messageInstance = association.getMessageInstance();
             final ServerMessage msg = messageInstance.getMessage();
             int requeues = 0;
-            if (messageInstance.makeAcquisitionUnstealable(entry.getConsumer()))
+            if (messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
             {
                 requeues = messageInstance.routeToAlternate(new Action<MessageInstance>()
                 {
@@ -2046,8 +2024,9 @@ public class AMQChannel
             _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
         }
 
-        Collection<MessageInstance> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
-        _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+        Collection<MessageConsumerAssociation> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+        final Collection<MessageInstance> messages = Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
+        _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
     }
 
     @Override
@@ -2573,19 +2552,19 @@ public class AMQChannel
             _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
         }
 
-        Map<Long, UnacknowledgedMessageMap.Entry> nackedMessageMap = new LinkedHashMap<>();
+        Map<Long, MessageConsumerAssociation> nackedMessageMap = new LinkedHashMap<>();
         _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
 
-        for(UnacknowledgedMessageMap.Entry unackedEntry : nackedMessageMap.values())
+        for(MessageConsumerAssociation unackedMessageConsumerAssociation : nackedMessageMap.values())
         {
 
-            if (unackedEntry == null)
+            if (unackedMessageConsumerAssociation == null)
             {
                 _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
             }
             else
             {
-                MessageInstance message = unackedEntry.getMessageInstance();
+                MessageInstance message = unackedMessageConsumerAssociation.getMessageInstance();
                 if (message.getMessage() == null)
                 {
                     _logger.warn("Message has already been purged, unable to nack.");

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Tue Nov 29 14:53:02 2016
@@ -43,7 +43,6 @@ public class AMQMessage extends Abstract
     public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference)
     {
         super(handle, connectionReference, handle.getMetaData().getContentSize());
-        ;
     }
 
     public MessageMetaData getMessageMetaData()

Copied: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java (from r1771878, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java?p2=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java&r1=1771878&r2=1771912&rev=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java Tue Nov 29 14:53:02 2016
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,49 +17,17 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol.v0_8;
 
-import java.util.Collection;
-import java.util.Map;
+package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 
-
-public interface UnacknowledgedMessageMap
+public interface MessageConsumerAssociation
 {
-    interface Visitor
-    {
-        /**
-         * @param deliveryTag
-         * @param message the message being iterated over @return true to stop iteration, false to continue
-         */
-        boolean callback(final long deliveryTag, MessageInstance message);
-
-        void visitComplete();
-    }
-
-    void visit(Visitor visitor);
-
-    void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
+    MessageInstance getMessageInstance();
 
-    Entry remove(long deliveryTag, final boolean restoreCredit);
+    ConsumerImpl getConsumer();
 
-    int size();
-
-    MessageInstance get(long deliveryTag);
-
-    Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
-    void collect(long key, boolean multiple, Map<Long, Entry> msgs);
-
-    interface Entry
-    {
-        MessageInstance getMessageInstance();
-
-        ConsumerImpl getConsumer();
-
-        long getSize();
-    }
+    long getSize();
 }
-
-

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Tue Nov 29 14:53:02 2016
@@ -31,11 +31,7 @@ public interface UnacknowledgedMessageMa
 {
     interface Visitor
     {
-        /**
-         * @param deliveryTag
-         * @param message the message being iterated over @return true to stop iteration, false to continue
-         */
-        boolean callback(final long deliveryTag, MessageInstance message);
+        boolean callback(final long deliveryTag, final MessageConsumerAssociation messageConsumerPair);
 
         void visitComplete();
     }
@@ -44,23 +40,14 @@ public interface UnacknowledgedMessageMa
 
     void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
 
-    Entry remove(long deliveryTag, final boolean restoreCredit);
+    MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit);
 
     int size();
 
     MessageInstance get(long deliveryTag);
 
-    Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
-    void collect(long key, boolean multiple, Map<Long, Entry> msgs);
-
-    interface Entry
-    {
-        MessageInstance getMessageInstance();
-
-        ConsumerImpl getConsumer();
-
-        long getSize();
-    }
+    Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple);
+    void collect(long key, boolean multiple, Map<Long, MessageConsumerAssociation> msgs);
 }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Tue Nov 29 14:53:02 2016
@@ -33,13 +33,13 @@ import org.apache.qpid.server.util.Conne
 
 class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
-    private static final class EntryImpl implements Entry
+    private static final class MessageConsumerAssociationImpl implements MessageConsumerAssociation
     {
         private final MessageInstance _messageInstance;
         private final ConsumerImpl _consumer;
         private final boolean _usesCredit;
 
-        private EntryImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
+        private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
         {
             _messageInstance = messageInstance;
             _consumer = consumer;
@@ -69,11 +69,11 @@ class UnacknowledgedMessageMapImpl imple
             return _usesCredit;
         }
     }
-    private final Map<Long, EntryImpl> _map;
+    private final Map<Long, MessageConsumerAssociationImpl> _map;
     // we keep this separately as it is accessed by the management thread
     private volatile int _size;
 
-    private CreditRestorer _creditRestorer;
+    private final CreditRestorer _creditRestorer;
 
     UnacknowledgedMessageMapImpl(int prefetchLimit, CreditRestorer creditRestorer)
     {
@@ -81,7 +81,7 @@ class UnacknowledgedMessageMapImpl imple
         _creditRestorer = creditRestorer;
     }
 
-    public void collect(long deliveryTag, boolean multiple, Map<Long, Entry> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, MessageConsumerAssociation> msgs)
     {
         if (multiple)
         {
@@ -89,10 +89,10 @@ class UnacknowledgedMessageMapImpl imple
         }
         else
         {
-            final Entry entry = _map.get(deliveryTag);
-            if(entry != null)
+            final MessageConsumerAssociation messageConsumerAssociation = _map.get(deliveryTag);
+            if(messageConsumerAssociation != null)
             {
-                msgs.put(deliveryTag, entry);
+                msgs.put(deliveryTag, messageConsumerAssociation);
             }
         }
 
@@ -106,9 +106,9 @@ class UnacknowledgedMessageMapImpl imple
         }
     }
 
-    public Entry remove(long deliveryTag, final boolean restoreCredit)
+    public MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit)
     {
-        EntryImpl entry = _map.remove(deliveryTag);
+        MessageConsumerAssociationImpl entry = _map.remove(deliveryTag);
         if(entry != null)
         {
             _size--;
@@ -122,16 +122,16 @@ class UnacknowledgedMessageMapImpl imple
 
     public void visit(Visitor visitor)
     {
-        for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
+        for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
         {
-            visitor.callback(entry.getKey(), entry.getValue().getMessageInstance());
+            visitor.callback(entry.getKey(), entry.getValue());
         }
         visitor.visitComplete();
     }
 
     public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
     {
-        if(_map.put(deliveryTag, new EntryImpl(message, consumer, usesCredit)) == null)
+        if(_map.put(deliveryTag, new MessageConsumerAssociationImpl(message, consumer, usesCredit)) == null)
         {
             _size++;
         }
@@ -148,35 +148,35 @@ class UnacknowledgedMessageMapImpl imple
 
     public MessageInstance get(long key)
     {
-        Entry messageTargetPair = _map.get(key);
-        return messageTargetPair == null ? null : messageTargetPair.getMessageInstance();
+        MessageConsumerAssociation association = _map.get(key);
+        return association == null ? null : association.getMessageInstance();
     }
 
-    public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
+    public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple)
     {
         if(multiple)
         {
-            Map<Long, Entry> ackedMessageMap = new LinkedHashMap<>();
+            Map<Long, MessageConsumerAssociation> ackedMessageMap = new LinkedHashMap<>();
             collect(deliveryTag, multiple, ackedMessageMap);
             remove(ackedMessageMap.keySet());
-            List<MessageInstance> acknowledged = new ArrayList<>();
-            for (Entry entry : ackedMessageMap.values())
+            List<MessageConsumerAssociation> acknowledged = new ArrayList<>();
+            for (MessageConsumerAssociation messageConsumerAssociation : ackedMessageMap.values())
             {
-                MessageInstance instance = entry.getMessageInstance();
-                if (instance.makeAcquisitionUnstealable(entry.getConsumer()))
+                MessageInstance instance = messageConsumerAssociation.getMessageInstance();
+                if (instance.makeAcquisitionUnstealable(messageConsumerAssociation.getConsumer()))
                 {
-                    acknowledged.add(instance);
+                    acknowledged.add(messageConsumerAssociation);
                 }
             }
             return acknowledged;
         }
         else
         {
-            MessageInstance instance;
-            final Entry entry = remove(deliveryTag, true);
-            if(entry != null && (instance = entry.getMessageInstance()).makeAcquisitionUnstealable(entry.getConsumer()))
+            final MessageConsumerAssociation association = remove(deliveryTag, true);
+            final MessageInstance messageInstance = association.getMessageInstance();
+            if(association != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
             {
-                return Collections.singleton(instance);
+                return Collections.singleton(association);
             }
             else
             {
@@ -186,9 +186,9 @@ class UnacknowledgedMessageMapImpl imple
         }
     }
 
-    private void collect(long key, Map<Long, Entry> msgs)
+    private void collect(long key, Map<Long, MessageConsumerAssociation> msgs)
     {
-        for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
+        for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
         {
             msgs.put(entry.getKey(), entry.getValue());
             if (entry.getKey() == key)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771912&r1=1771911&r2=1771912&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Tue Nov 29 14:53:02 2016
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.when;
 
 import java.util.Collection;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
@@ -32,6 +35,16 @@ import org.apache.qpid.test.utils.QpidTe
 
 public class UnacknowledgedMessageMapTest extends QpidTestCase
 {
+    public static final Function<MessageConsumerAssociation, MessageInstance>
+            MESSAGE_INSTANCE_FUNCTION = new Function<MessageConsumerAssociation, MessageInstance>()
+
+    {
+        @Override
+        public MessageInstance apply(final MessageConsumerAssociation input)
+        {
+            return input.getMessageInstance();
+        }
+    };
     private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
 
     public void testDeletedMessagesCantBeAcknowledged()
@@ -40,12 +53,13 @@ public class UnacknowledgedMessageMapTes
         final int expectedSize = 5;
         MessageInstance[] msgs = populateMap(map,expectedSize);
         assertEquals(expectedSize,map.size());
-        Collection<MessageInstance> acknowledged = map.acknowledge(100, true);
+        Collection<MessageConsumerAssociation> acknowledged = map.acknowledge(100, true);
+        Collection<MessageInstance> acknowledgedMessages = Collections2.transform(acknowledged, MESSAGE_INSTANCE_FUNCTION);
         assertEquals(expectedSize, acknowledged.size());
         assertEquals(0,map.size());
         for(int i = 0; i < expectedSize; i++)
         {
-            assertTrue("Message " + i + " is missing", acknowledged.contains(msgs[i]));
+            assertTrue("Message " + i + " is missing", acknowledgedMessages.contains(msgs[i]));
         }
 
         map = new UnacknowledgedMessageMapImpl(100, mock(CreditRestorer.class));
@@ -58,11 +72,12 @@ public class UnacknowledgedMessageMapTes
 
 
         acknowledged = map.acknowledge(100, true);
+        acknowledgedMessages = Collections2.transform(acknowledged, MESSAGE_INSTANCE_FUNCTION);
         assertEquals(expectedSize-2, acknowledged.size());
         assertEquals(0,map.size());
         for(int i = 0; i < expectedSize; i++)
         {
-            assertEquals(i != 2 && i != 4, acknowledged.contains(msgs[i]));
+            assertEquals(i != 2 && i != 4, acknowledgedMessages.contains(msgs[i]));
         }
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org