You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 01:26:36 UTC

svn commit: r1564581 [2/2] - in /qpid/branches/java-broker-amqp-1-0-management/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/jav...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb  5 00:26:35 2014
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -67,7 +68,9 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -143,7 +146,7 @@ public class AMQChannel implements AMQSe
     private volatile boolean _rollingBack;
 
     private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
-    private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
     private long _createTime = System.currentTimeMillis();
@@ -673,22 +676,13 @@ public class AMQChannel implements AMQSe
      *                    delivery tag)
      * @param subscription The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription)
     {
         if (_logger.isDebugEnabled())
         {
-            if (entry.getQueue() == null)
-            {
-                _logger.debug("Adding unacked message with a null queue:" + entry);
-            }
-            else
-            {
-                if (_logger.isDebugEnabled())
-                {
                     _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
-                               + ") with a queue(" + entry.getQueue() + ") for " + subscription);
-                }
-            }
+                               + ") for " + subscription);
+
         }
 
         _unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -711,7 +705,7 @@ public class AMQChannel implements AMQSe
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+        Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
         if (!messagesToBeDelivered.isEmpty())
         {
@@ -722,21 +716,13 @@ public class AMQChannel implements AMQSe
 
         }
 
-        for (QueueEntry unacked : messagesToBeDelivered)
+        for (MessageInstance unacked : messagesToBeDelivered)
         {
-            if (!unacked.isQueueDeleted())
-            {
-                // Mark message redelivered
-                unacked.setRedelivered();
-
-                // Ensure message is released for redelivery
-                unacked.release();
+            // Mark message redelivered
+            unacked.setRedelivered();
 
-            }
-            else
-            {
-                unacked.delete();
-            }
+            // Ensure message is released for redelivery
+            unacked.release();
         }
 
     }
@@ -750,7 +736,7 @@ public class AMQChannel implements AMQSe
      */
     public void requeue(long deliveryTag) throws AMQException
     {
-        QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+        MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
         if (unacked != null)
         {
@@ -758,20 +744,8 @@ public class AMQChannel implements AMQSe
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            if (!unacked.isQueueDeleted())
-            {
-
-                // Ensure message is released for redelivery
-                unacked.release();
+            unacked.release();
 
-            }
-            else
-            {
-                _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
-                          + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
-
-                unacked.delete();
-            }
         }
         else
         {
@@ -784,10 +758,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             return maximumDeliveryCount > 0;
         }
 
@@ -796,10 +770,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isDeliveredTooManyTimes(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             final int numDeliveries = queueEntry.getDeliveryCount();
             return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
         }
@@ -818,8 +792,8 @@ public class AMQChannel implements AMQSe
     {
 
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         if (_logger.isDebugEnabled())
         {
@@ -831,9 +805,8 @@ public class AMQChannel implements AMQSe
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
                                                                     msgToRequeue,
-                                                                    msgToResend,
-                                                                    requeue,
-                                                                    _messageStore));
+                                                                    msgToResend
+        ));
 
 
         // Process Messages to Resend
@@ -849,9 +822,9 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
@@ -877,9 +850,9 @@ public class AMQChannel implements AMQSe
         }
 
         // Process Messages to Requeue at the front of the queue
-        for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
@@ -905,11 +878,11 @@ public class AMQChannel implements AMQSe
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+        Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
     }
 
-    private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+    private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
     {
 
         return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -1077,7 +1050,7 @@ public class AMQChannel implements AMQSe
 
         postRollbackTask.run();
 
-        for(QueueEntry entry : _resendList)
+        for(MessageInstance entry : _resendList)
         {
             Subscription sub = entry.getDeliveredSubscription();
             if(sub == null || sub.isClosed())
@@ -1152,7 +1125,7 @@ public class AMQChannel implements AMQSe
     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
             {
                 addUnacknowledgedMessage(entry, deliveryTag, sub);
             }
@@ -1288,9 +1261,9 @@ public class AMQChannel implements AMQSe
 
     private class MessageAcknowledgeAction implements ServerTransaction.Action
     {
-        private final Collection<QueueEntry> _ackedMessages;
+        private final Collection<MessageInstance> _ackedMessages;
 
-        public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
         {
             _ackedMessages = ackedMessages;
         }
@@ -1299,7 +1272,7 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                for(QueueEntry entry : _ackedMessages)
+                for(MessageInstance entry : _ackedMessages)
                 {
                     entry.delete();
                 }
@@ -1322,7 +1295,7 @@ public class AMQChannel implements AMQSe
             {
                 try
                 {
-                    for(QueueEntry entry : _ackedMessages)
+                    for(MessageInstance entry : _ackedMessages)
                     {
                         entry.release();
                     }
@@ -1490,7 +1463,7 @@ public class AMQChannel implements AMQSe
     public void deadLetter(long deliveryTag) throws AMQException
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
-        final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+        final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
 
         if (rejectedQueueEntry == null)
         {
@@ -1499,6 +1472,7 @@ public class AMQChannel implements AMQSe
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
+            final Subscription sub = rejectedQueueEntry.getDeliveredSubscription();
 
             int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
                 {
@@ -1512,23 +1486,28 @@ public class AMQChannel implements AMQSe
 
             if(requeues == 0)
             {
-                final AMQQueue queue = rejectedQueueEntry.getQueue();
 
-                final Exchange altExchange = queue.getAlternateExchange();
-
-                if (altExchange == null)
+                final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+                if(owningResource instanceof AMQQueue)
                 {
-                    _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
-                    _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                    final AMQQueue queue = (AMQQueue) owningResource;
 
-                }
-                else
-                {
-                    _logger.debug(
-                            "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
-                            + deliveryTag);
-                    _actor.message(_logSubject,
-                                   ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    final Exchange altExchange = queue.getAlternateExchange();
+
+                    if (altExchange == null)
+                    {
+                        _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                        _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+                    }
+                    else
+                    {
+                        _logger.debug(
+                                "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+                                + deliveryTag);
+                        _actor.message(_logSubject,
+                                       ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    }
                 }
             }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Wed Feb  5 00:26:35 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription;
@@ -35,26 +36,20 @@ public class ExtractResendAndRequeue imp
 {
     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
 
-    private final Map<Long, QueueEntry> _msgToRequeue;
-    private final Map<Long, QueueEntry> _msgToResend;
-    private final boolean _requeueIfUnableToResend;
+    private final Map<Long, MessageInstance> _msgToRequeue;
+    private final Map<Long, MessageInstance> _msgToResend;
     private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
-    private final MessageStore _transactionLog;
 
     public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
-                                   Map<Long, QueueEntry> msgToRequeue,
-                                   Map<Long, QueueEntry> msgToResend,
-                                   boolean requeueIfUnableToResend,
-                                   MessageStore txnLog)
+                                   Map<Long, MessageInstance> msgToRequeue,
+                                   Map<Long, MessageInstance> msgToResend)
     {
         _unacknowledgedMessageMap = unacknowledgedMessageMap;
         _msgToRequeue = msgToRequeue;
         _msgToResend = msgToResend;
-        _requeueIfUnableToResend = requeueIfUnableToResend;
-        _transactionLog = txnLog;
     }
 
-    public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+    public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
     {
 
         message.setRedelivered();
@@ -73,58 +68,13 @@ public class ExtractResendAndRequeue imp
         }
         else
         {
-            // Message has no consumer tag, so was "delivered" to a GET
-            // or consumer no longer registered
-            // cannot resend, so re-queue.
-            if (!message.isQueueDeleted())
-            {
-                if (_requeueIfUnableToResend)
-                {
-                    _msgToRequeue.put(deliveryTag, message);
-                }
-                else
-                {
-
-                    dequeueEntry(message);
-                    _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-                }
-            }
-            else
-            {
-                dequeueEntry(message);
-                _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
-            }
+            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
         }
 
         // false means continue processing
         return false;
     }
 
-
-    private void dequeueEntry(final QueueEntry node)
-    {
-        ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
-        dequeueEntry(node, txn);
-    }
-
-    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
-    {
-        txn.dequeue(node.getQueue(), node.getMessage(),
-                    new ServerTransaction.Action()
-                    {
-
-                        public void postCommit()
-                        {
-                            node.delete();
-                        }
-
-                        public void onRollback()
-                        {
-
-                        }
-                    });
-    }
-
     public void visitComplete()
     {
         _unacknowledgedMessageMap.clear();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java Wed Feb  5 00:26:35 2014
@@ -20,10 +20,10 @@
 */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.subscription.Subscription;
 
 public interface RecordDeliveryMethod
 {
-    void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag);
+    void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag);
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java Wed Feb  5 00:26:35 2014
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,7 +103,7 @@ public abstract class SubscriptionTarget
          * @throws org.apache.qpid.AMQException
          */
         @Override
-        public void send(QueueEntry entry, boolean batch) throws AMQException
+        public void send(MessageInstance entry, boolean batch) throws AMQException
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
@@ -165,7 +166,7 @@ public abstract class SubscriptionTarget
          * @throws org.apache.qpid.AMQException
          */
         @Override
-        public void send(QueueEntry entry, boolean batch) throws AMQException
+        public void send(MessageInstance entry, boolean batch) throws AMQException
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -176,7 +177,7 @@ public abstract class SubscriptionTarget
 
             // The send may of course still fail, in which case, as
             // the message is unacked, it will be lost.
-            _txn.dequeue(entry.getQueue(), entry.getMessage(), NOOP);
+            _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
 
             ServerMessage message = entry.getMessage();
             MessageReference ref = message.newReference();
@@ -281,7 +282,7 @@ public abstract class SubscriptionTarget
          * @throws org.apache.qpid.AMQException
          */
         @Override
-        public void send(QueueEntry entry, boolean batch) throws AMQException
+        public void send(MessageInstance entry, boolean batch) throws AMQException
         {
 
 
@@ -492,7 +493,7 @@ public abstract class SubscriptionTarget
     }
 
 
-    protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
+    protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
     {
         _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
     }
@@ -520,9 +521,9 @@ public abstract class SubscriptionTarget
         _channel.getProtocolSession().flushBatched();
     }
 
-    protected void addUnacknowledgedMessage(QueueEntry entry)
+    protected void addUnacknowledgedMessage(MessageInstance entry)
     {
-        final long size = entry.getSize();
+        final long size = entry.getMessage().getSize();
         _unacknowledgedBytes.addAndGet(size);
         _unacknowledgedCount.incrementAndGet();
         entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Wed Feb  5 00:26:35 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMa
          *@param message the message being iterated over @return true to stop iteration, false to continue
          * @throws AMQException
          */
-        boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+        boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
 
         void visitComplete();
     }
 
     void visit(Visitor visitor) throws AMQException;
 
-    void add(long deliveryTag, QueueEntry message);
+    void add(long deliveryTag, MessageInstance message);
 
-    QueueEntry remove(long deliveryTag);
+    MessageInstance remove(long deliveryTag);
 
-    Collection<QueueEntry> cancelAllMessages();
+    Collection<MessageInstance> cancelAllMessages();
 
     int size();
 
     void clear();
 
-    QueueEntry get(long deliveryTag);
+    MessageInstance get(long deliveryTag);
 
     /**
      * Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMa
      */
     Set<Long> getDeliveryTags();
 
-    Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+    Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
 
 }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Wed Feb  5 00:26:35 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImp
 
     private long _unackedSize;
 
-    private Map<Long, QueueEntry> _map;
+    private Map<Long, MessageInstance> _map;
 
     private long _lastDeliveryTag;
 
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImp
     public UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
-        _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+        _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
     }
 
-    public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
     {
         if (multiple)
         {
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImp
         }
         else
         {
-            final QueueEntry entry = get(deliveryTag);
+            final MessageInstance entry = get(deliveryTag);
             if(entry != null)
             {
                 msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImp
 
     }
 
-    public void remove(Map<Long,QueueEntry> msgs)
+    public void remove(Map<Long,MessageInstance> msgs)
     {
         synchronized (_lock)
         {
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry remove(long deliveryTag)
+    public MessageInstance remove(long deliveryTag)
     {
         synchronized (_lock)
         {
 
-            QueueEntry message = _map.remove(deliveryTag);
+            MessageInstance message = _map.remove(deliveryTag);
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImp
     {
         synchronized (_lock)
         {
-            Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
-            for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+            Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+            for (Map.Entry<Long, MessageInstance> entry : currentEntries)
             {
                 visitor.callback(entry.getKey().longValue(), entry.getValue());
             }
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public void add(long deliveryTag, QueueEntry message)
+    public void add(long deliveryTag, MessageInstance message)
     {
         synchronized (_lock)
         {
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> cancelAllMessages()
+    public Collection<MessageInstance> cancelAllMessages()
     {
         synchronized (_lock)
         {
-            Collection<QueueEntry> currentEntries = _map.values();
-            _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+            Collection<MessageInstance> currentEntries = _map.values();
+            _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
             _unackedSize = 0l;
             return currentEntries;
         }
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry get(long key)
+    public MessageInstance get(long key)
     {
         synchronized (_lock)
         {
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+    public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
     {
-        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+        Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
         collect(deliveryTag, multiple, ackedMessageMap);
         remove(ackedMessageMap);
         return ackedMessageMap.values();
     }
 
-    private void collect(long key, Map<Long, QueueEntry> msgs)
+    private void collect(long key, Map<Long, MessageInstance> msgs)
     {
         synchronized (_lock)
         {
-            for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+            for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
             {
                 msgs.put(entry.getKey(),entry.getValue());
                 if (entry.getKey() == key)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Wed Feb  5 00:26:35 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.BasicGetE
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.flow.FlowCreditManager;
@@ -149,7 +150,7 @@ public class BasicGetMethodHandler imple
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
             {
                 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
             }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Wed Feb  5 00:26:35 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler im
 
         long deliveryTag = body.getDeliveryTag();
 
-        QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+        MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
 
         if (message == null)
         {
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler im
         }
         else
         {
-            if (message.isQueueDeleted())
-            {
-                _logger.warn("Message's Queue has already been purged, dropping message");
-                message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                if(message != null)
-                {
-                    message.delete();
-                }
-                return;
-            }
 
             if (message.getMessage() == null)
             {
@@ -100,11 +91,11 @@ public class BasicRejectMethodHandler im
 
             if (body.getRequeue())
             {
-                channel.requeue(deliveryTag);
-
                 //this requeue represents a message rejected from the pre-dispatch queue
                 //therefore we need to amend the delivery counter.
                 message.decrementDeliveryCount();
+
+                channel.requeue(deliveryTag);
             }
             else
             {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Feb  5 00:26:35 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoredMessage;
@@ -194,8 +195,8 @@ public class AckTest extends QpidTestCas
         {
             assertTrue(deliveryTag == i);
             i++;
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
         }
 
     }
@@ -275,8 +276,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
             {
@@ -314,8 +315,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }
@@ -346,8 +347,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Wed Feb  5 00:26:35 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -63,7 +64,6 @@ public class ExtractResendAndRequeueTest
     private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
     private AMQQueue _queue;
-    private MessageStore _messageStore = new TestMemoryMessageStore();
     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
     private Subscription _subscription;
     private boolean _queueDeleted;
@@ -141,12 +141,12 @@ public class ExtractResendAndRequeueTest
         //We don't need the subscription object here.
         acquireMessages(_referenceList);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -170,94 +170,17 @@ public class ExtractResendAndRequeueTest
         // Close subscription
         when(_subscription.isClosed()).thenReturn(true);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
         assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
     }
 
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
-     * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = true so all messages should go to msgToRequeue
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
-     * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testDrop() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
-     * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
-     * future we may wish to dead letter the message.
-     *
-     * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
-     *
-     * @throws AMQException the visit interface throws this
-     */
-    public void testDiscard() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        _queueDeleted = true;
-        // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-    }
 
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb  5 00:26:35 2014
@@ -65,6 +65,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -85,14 +86,14 @@ public class SendingLink_1_0 implements 
     private SubscriptionTarget_1_0 _target;
 
     private boolean _draining;
-    private final Map<Binary, QueueEntry> _unsettledMap =
-            new HashMap<Binary, QueueEntry>();
+    private final Map<Binary, MessageInstance> _unsettledMap =
+            new HashMap<Binary, MessageInstance>();
 
     private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
             new ConcurrentHashMap<Binary, UnsettledAction>();
     private volatile SendingLinkAttachment _linkAttachment;
     private TerminusDurability _durability;
-    private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
     private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
     private Runnable _closeAction;
     private final AMQQueue _queue;
@@ -559,7 +560,7 @@ public class SendingLink_1_0 implements 
         }
     }
 
-    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
     {
         _unsettledActionMap.put(tag,unsettledAction);
         if(getTransactionId() == null)
@@ -631,14 +632,14 @@ public class SendingLink_1_0 implements 
         SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
         endpoint.setDeliveryStateHandler(this);
         Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-        Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
         _resumeAcceptedTransfers.clear();
         _resumeFullTransfers.clear();
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
-            final QueueEntry queueEntry = entry.getValue();
+            final MessageInstance queueEntry = entry.getValue();
             if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
@@ -706,9 +707,9 @@ public class SendingLink_1_0 implements 
 
     public Map getUnsettledOutcomeMap()
     {
-        Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
         {
             entry.setValue(null);
         }
@@ -720,4 +721,9 @@ public class SendingLink_1_0 implements 
     {
         _closeAction = action;
     }
+
+    public VirtualHost getVirtualHost()
+    {
+        return _vhost;
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?rev=1564581&r1=1564580&r2=1564581&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Wed Feb  5 00:26:35 2014
@@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -112,7 +113,7 @@ class SubscriptionTarget_1_0 extends Abs
         }
     }
 
-    public void send(QueueEntry entry, boolean batch) throws AMQException
+    public void send(MessageInstance entry, boolean batch) throws AMQException
     {
         // TODO
         send(entry);
@@ -123,7 +124,7 @@ class SubscriptionTarget_1_0 extends Abs
         // TODO
     }
 
-    public void send(final QueueEntry queueEntry) throws AMQException
+    public void send(final MessageInstance queueEntry) throws AMQException
     {
         ServerMessage serverMessage = queueEntry.getMessage();
         Message_1_0 message;
@@ -134,7 +135,7 @@ class SubscriptionTarget_1_0 extends Abs
         else
         {
             final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
-            message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost());
+            message = (Message_1_0) converter.convert(serverMessage, _link.getVirtualHost());
         }
 
         Transfer transfer = new Transfer();
@@ -344,10 +345,10 @@ class SubscriptionTarget_1_0 extends Abs
     private class DispositionAction implements UnsettledAction
     {
 
-        private final QueueEntry _queueEntry;
+        private final MessageInstance _queueEntry;
         private final Binary _deliveryTag;
 
-        public DispositionAction(Binary tag, QueueEntry queueEntry)
+        public DispositionAction(Binary tag, MessageInstance queueEntry)
         {
             _deliveryTag = tag;
             _queueEntry = queueEntry;
@@ -378,7 +379,7 @@ class SubscriptionTarget_1_0 extends Abs
 
             if(outcome instanceof Accepted)
             {
-                txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+                txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
                         new ServerTransaction.Action()
                         {
 
@@ -469,7 +470,7 @@ class SubscriptionTarget_1_0 extends Abs
     private class DoNothingAction implements UnsettledAction
     {
         public DoNothingAction(final Binary tag,
-                               final QueueEntry queueEntry)
+                               final MessageInstance queueEntry)
         {
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org